This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git


The following commit(s) were added to refs/heads/master by this push:
     new 07e1518  Support end to end encryption/decryption (#445)
07e1518 is described below

commit 07e1518dcf196563a3e9b0a033cc27499d913677
Author: Baodi Shi <[email protected]>
AuthorDate: Mon Dec 22 10:26:59 2025 +0800

    Support end to end encryption/decryption (#445)
---
 binding.gyp                         |   3 +-
 index.d.ts                          |  31 +++++
 index.js                            |   1 +
 src/ConsumerConfig.cc               |  24 ++--
 src/CryptoKeyReader.cc              | 151 ++++++++++++++++++++++++
 src/{addon.cc => CryptoKeyReader.h} |  33 +++---
 src/Message.cc                      |  66 ++++++++++-
 src/Message.h                       |   1 +
 src/ProducerConfig.cc               |  38 ++++--
 src/addon.cc                        |   2 +
 tests/encryption.test.js            | 223 ++++++++++++++++++++++++++++++++++++
 11 files changed, 537 insertions(+), 36 deletions(-)

diff --git a/binding.gyp b/binding.gyp
index e5116af..563dce5 100644
--- a/binding.gyp
+++ b/binding.gyp
@@ -41,7 +41,8 @@
         "src/ConsumerConfig.cc",
         "src/Reader.cc",
         "src/ReaderConfig.cc",
-        "src/ThreadSafeDeferred.cc"
+        "src/ThreadSafeDeferred.cc",
+        "src/CryptoKeyReader.cc"
       ],
       'conditions': [
         ['OS=="mac"', {
diff --git a/index.d.ts b/index.d.ts
index 46ac69a..b85b623 100644
--- a/index.d.ts
+++ b/index.d.ts
@@ -65,6 +65,8 @@ export interface ProducerConfig {
   properties?: { [key: string]: string };
   publicKeyPath?: string;
   encryptionKey?: string;
+  encryptionKeys?: string[];
+  cryptoKeyReader?: CryptoKeyReader;
   cryptoFailureAction?: ProducerCryptoFailureAction;
   chunkingEnabled?: boolean;
   schema?: SchemaInfo;
@@ -99,6 +101,7 @@ export interface ConsumerConfig {
   listener?: (message: Message, consumer: Consumer) => void;
   readCompacted?: boolean;
   privateKeyPath?: string;
+  cryptoKeyReader?: CryptoKeyReader;
   cryptoFailureAction?: ConsumerCryptoFailureAction;
   maxPendingChunkedMessage?: number;
   autoAckOldestChunkedMessageOnQueueFull?: number;
@@ -171,6 +174,7 @@ export class Message {
   getPartitionKey(): string;
   getOrderingKey(): string;
   getProducerName(): string;
+  getEncryptionContext(): EncryptionContext | null;
 }
 
 export class MessageId {
@@ -198,6 +202,22 @@ export interface TopicMetadata {
  */
 export type MessageRouter = (message: Message, topicMetadata: TopicMetadata) 
=> number;
 
+export interface EncryptionKey {
+  key: string;
+  value: Buffer;
+  metadata: { [key: string]: string };
+}
+
+export interface EncryptionContext {
+  keys: EncryptionKey[];
+  param: Buffer;
+  algorithm: string;
+  compressionType: CompressionType;
+  uncompressedMessageSize: number;
+  batchSize: number;
+  isDecryptionFailed: boolean;
+}
+
 export interface SchemaInfo {
   schemaType: SchemaType;
   name?: string;
@@ -285,6 +305,16 @@ export class AuthenticationBasic {
   });
 }
 
+export interface EncryptionKeyInfo {
+  key: Buffer;
+  metadata: { [key: string]: string };
+}
+
+export class CryptoKeyReader {
+  getPublicKey(keyName: string, metadata: { [key: string]: string }): 
EncryptionKeyInfo;
+  getPrivateKey(keyName: string, metadata: { [key: string]: string }): 
EncryptionKeyInfo;
+}
+
 export enum LogLevel {
   DEBUG = 0,
   INFO = 1,
@@ -303,6 +333,7 @@ export type HashingScheme =
   'JavaStringHash';
 
 export type CompressionType =
+  'None' |
   'Zlib' |
   'LZ4' |
   'ZSTD' |
diff --git a/index.js b/index.js
index ddbb997..d909251 100644
--- a/index.js
+++ b/index.js
@@ -37,6 +37,7 @@ const Pulsar = {
   Client,
   Message: PulsarBinding.Message,
   MessageId: PulsarBinding.MessageId,
+  CryptoKeyReader: PulsarBinding.CryptoKeyReader,
   AuthenticationTls,
   AuthenticationAthenz,
   AuthenticationToken,
diff --git a/src/ConsumerConfig.cc b/src/ConsumerConfig.cc
index 7b2b61c..e7419c6 100644
--- a/src/ConsumerConfig.cc
+++ b/src/ConsumerConfig.cc
@@ -20,6 +20,7 @@
 #include "ConsumerConfig.h"
 #include "Consumer.h"
 #include "SchemaInfo.h"
+#include "CryptoKeyReader.h"
 #include "Message.h"
 #include "pulsar/ConsumerConfiguration.h"
 #include <pulsar/c/consumer_configuration.h>
@@ -60,6 +61,7 @@ static const std::string CFG_KEY_SHARED_POLICY = 
"keySharedPolicy";
 static const std::string CFG_KEY_SHARED_POLICY_MODE = "keyShareMode";
 static const std::string CFG_KEY_SHARED_POLICY_ALLOW_OUT_OF_ORDER = 
"allowOutOfOrderDelivery";
 static const std::string CFG_KEY_SHARED_POLICY_STICKY_RANGES = "stickyRanges";
+static const std::string CFG_CRYPTO_KEY_READER = "cryptoKeyReader";
 
 static const std::map<std::string, pulsar_consumer_type> SUBSCRIPTION_TYPE = {
     {"Exclusive", pulsar_ConsumerExclusive},
@@ -249,13 +251,21 @@ void 
ConsumerConfig::InitConfig(std::shared_ptr<ThreadSafeDeferred> deferred,
     std::string privateKeyPath = 
consumerConfig.Get(CFG_PRIVATE_KEY_PATH).ToString().Utf8Value();
     pulsar_consumer_configuration_set_default_crypto_key_reader(
         this->cConsumerConfig.get(), publicKeyPath.c_str(), 
privateKeyPath.c_str());
-    if (consumerConfig.Has(CFG_CRYPTO_FAILURE_ACTION) &&
-        consumerConfig.Get(CFG_CRYPTO_FAILURE_ACTION).IsString()) {
-      std::string cryptoFailureAction = 
consumerConfig.Get(CFG_CRYPTO_FAILURE_ACTION).ToString().Utf8Value();
-      if (CONSUMER_CRYPTO_FAILURE_ACTION.count(cryptoFailureAction)) {
-        pulsar_consumer_configuration_set_crypto_failure_action(
-            this->cConsumerConfig.get(), 
CONSUMER_CRYPTO_FAILURE_ACTION.at(cryptoFailureAction));
-      }
+  }
+
+  if (consumerConfig.Has(CFG_CRYPTO_KEY_READER) && 
consumerConfig.Get(CFG_CRYPTO_KEY_READER).IsObject()) {
+    Napi::Object cryptoKeyReaderObj = 
consumerConfig.Get(CFG_CRYPTO_KEY_READER).As<Napi::Object>();
+    CryptoKeyReader *cryptoKeyReader = 
Napi::ObjectWrap<CryptoKeyReader>::Unwrap(cryptoKeyReaderObj);
+    this->cConsumerConfig.get()->consumerConfiguration.setCryptoKeyReader(
+        cryptoKeyReader->GetCCryptoKeyReader());
+  }
+
+  if (consumerConfig.Has(CFG_CRYPTO_FAILURE_ACTION) &&
+      consumerConfig.Get(CFG_CRYPTO_FAILURE_ACTION).IsString()) {
+    std::string cryptoFailureAction = 
consumerConfig.Get(CFG_CRYPTO_FAILURE_ACTION).ToString().Utf8Value();
+    if (CONSUMER_CRYPTO_FAILURE_ACTION.count(cryptoFailureAction)) {
+      pulsar_consumer_configuration_set_crypto_failure_action(
+          this->cConsumerConfig.get(), 
CONSUMER_CRYPTO_FAILURE_ACTION.at(cryptoFailureAction));
     }
   }
 
diff --git a/src/CryptoKeyReader.cc b/src/CryptoKeyReader.cc
new file mode 100644
index 0000000..4cf09ad
--- /dev/null
+++ b/src/CryptoKeyReader.cc
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "CryptoKeyReader.h"
+#include <pulsar/Result.h>
+#include <pulsar/EncryptionKeyInfo.h>
+#include <thread>
+#include <future>
+
+class CryptoKeyReaderWrapper : public pulsar::CryptoKeyReader {
+ public:
+  CryptoKeyReaderWrapper(const Napi::Object& jsObject) : 
mainThreadId_(std::this_thread::get_id()) {
+    jsObject_.Reset(jsObject, 1);
+    tsfn_ = Napi::ThreadSafeFunction::New(
+        jsObject.Env(), Napi::Function::New(jsObject.Env(), [](const 
Napi::CallbackInfo& info) {}), jsObject,
+        "CryptoKeyReader", 0, 1);
+  }
+
+  ~CryptoKeyReaderWrapper() { tsfn_.Release(); }
+
+  pulsar::Result getPublicKey(const std::string& keyName, 
std::map<std::string, std::string>& metadata,
+                              pulsar::EncryptionKeyInfo& encKeyInfo) const 
override {
+    return executeCallback("getPublicKey", keyName, metadata, encKeyInfo);
+  }
+
+  pulsar::Result getPrivateKey(const std::string& keyName, 
std::map<std::string, std::string>& metadata,
+                               pulsar::EncryptionKeyInfo& encKeyInfo) const 
override {
+    return executeCallback("getPrivateKey", keyName, metadata, encKeyInfo);
+  }
+
+ private:
+  Napi::ObjectReference jsObject_;
+  Napi::ThreadSafeFunction tsfn_;
+  std::thread::id mainThreadId_;
+
+  static void parseEncryptionKeyInfo(const Napi::Object& obj, 
pulsar::EncryptionKeyInfo& info) {
+    if (obj.Has("key") && obj.Get("key").IsBuffer()) {
+      Napi::Buffer<char> keyBuf = obj.Get("key").As<Napi::Buffer<char>>();
+      info.setKey(std::string(keyBuf.Data(), keyBuf.Length()));
+    }
+    if (obj.Has("metadata") && obj.Get("metadata").IsObject()) {
+      std::map<std::string, std::string> metadata;
+      Napi::Object metaObj = obj.Get("metadata").As<Napi::Object>();
+      Napi::Array keys = metaObj.GetPropertyNames();
+      for (uint32_t i = 0; i < keys.Length(); i++) {
+        std::string k = keys.Get(i).ToString().Utf8Value();
+        std::string v = metaObj.Get(k).ToString().Utf8Value();
+        metadata[k] = v;
+      }
+      info.setMetadata(metadata);
+    }
+  }
+
+  pulsar::Result callJsMethod(Napi::Env env, const std::string& method, const 
std::string& keyName,
+                              const std::map<std::string, std::string>& 
metadata,
+                              pulsar::EncryptionKeyInfo& encKeyInfo) const {
+    Napi::HandleScope scope(env);
+
+    if (jsObject_.IsEmpty()) {
+      return pulsar::Result::ResultCryptoError;
+    }
+    Napi::Object obj = jsObject_.Value();
+
+    if (!obj.Has(method)) {
+      return pulsar::Result::ResultCryptoError;
+    }
+    Napi::Value funcVal = obj.Get(method);
+    if (!funcVal.IsFunction()) {
+      return pulsar::Result::ResultCryptoError;
+    }
+    Napi::Function func = funcVal.As<Napi::Function>();
+
+    Napi::Object metadataObj = Napi::Object::New(env);
+    for (const auto& kv : metadata) {
+      metadataObj.Set(kv.first, kv.second);
+    }
+
+    try {
+      Napi::Value result = func.Call(obj, {Napi::String::New(env, keyName), 
metadataObj});
+      if (result.IsObject()) {
+        parseEncryptionKeyInfo(result.As<Napi::Object>(), encKeyInfo);
+        return pulsar::Result::ResultOk;
+      }
+    } catch (const Napi::Error& e) {
+      return pulsar::Result::ResultCryptoError;
+    } catch (...) {
+      return pulsar::Result::ResultCryptoError;
+    }
+    return pulsar::Result::ResultCryptoError;
+  }
+
+  pulsar::Result executeCallback(const std::string& method, const std::string& 
keyName,
+                                 std::map<std::string, std::string>& metadata,
+                                 pulsar::EncryptionKeyInfo& encKeyInfo) const {
+    if (std::this_thread::get_id() == mainThreadId_) {
+      return callJsMethod(jsObject_.Env(), method, keyName, metadata, 
encKeyInfo);
+    } else {
+      auto promise = std::make_shared<std::promise<pulsar::Result>>();
+      auto future = promise->get_future();
+
+      napi_status status = tsfn_.BlockingCall([this, promise, &method, 
&keyName, &metadata, &encKeyInfo](
+                                                  Napi::Env env, 
Napi::Function jsCallback) {
+        promise->set_value(callJsMethod(env, method, keyName, metadata, 
encKeyInfo));
+      });
+
+      if (status != napi_ok) {
+        return pulsar::Result::ResultCryptoError;
+      }
+
+      future.wait();
+      return future.get();
+    }
+  }
+};
+
+Napi::FunctionReference CryptoKeyReader::constructor;
+
+void CryptoKeyReader::Init(Napi::Env env, Napi::Object exports) {
+  Napi::HandleScope scope(env);
+
+  Napi::Function func = DefineClass(env, "CryptoKeyReader", {});
+
+  constructor = Napi::Persistent(func);
+  constructor.SuppressDestruct();
+
+  exports.Set("CryptoKeyReader", func);
+}
+
+CryptoKeyReader::CryptoKeyReader(const Napi::CallbackInfo& info) : 
Napi::ObjectWrap<CryptoKeyReader>(info) {}
+
+CryptoKeyReader::~CryptoKeyReader() {}
+
+std::shared_ptr<pulsar::CryptoKeyReader> 
CryptoKeyReader::GetCCryptoKeyReader() {
+  return std::make_shared<CryptoKeyReaderWrapper>(Value());
+}
diff --git a/src/addon.cc b/src/CryptoKeyReader.h
similarity index 62%
copy from src/addon.cc
copy to src/CryptoKeyReader.h
index fa26ae0..19d9155 100644
--- a/src/addon.cc
+++ b/src/CryptoKeyReader.h
@@ -17,23 +17,22 @@
  * under the License.
  */
 
-#include "Message.h"
-#include "MessageId.h"
-#include "Authentication.h"
-#include "Producer.h"
-#include "Consumer.h"
-#include "Client.h"
-#include "Reader.h"
+#ifndef CRYPTO_KEY_READER_H
+#define CRYPTO_KEY_READER_H
+
 #include <napi.h>
+#include <pulsar/CryptoKeyReader.h>
+
+class CryptoKeyReader : public Napi::ObjectWrap<CryptoKeyReader> {
+ public:
+  static void Init(Napi::Env env, Napi::Object exports);
+  static Napi::Object NewInstance(const Napi::CallbackInfo &info);
+  CryptoKeyReader(const Napi::CallbackInfo &info);
+  ~CryptoKeyReader();
+  std::shared_ptr<pulsar::CryptoKeyReader> GetCCryptoKeyReader();
 
-Napi::Object InitAll(Napi::Env env, Napi::Object exports) {
-  Message::Init(env, exports);
-  MessageId::Init(env, exports);
-  Authentication::Init(env, exports);
-  Producer::Init(env, exports);
-  Consumer::Init(env, exports);
-  Reader::Init(env, exports);
-  return Client::Init(env, exports);
-}
+ private:
+  static Napi::FunctionReference constructor;
+};
 
-NODE_API_MODULE(NODE_GYP_MODULE_NAME, InitAll)
+#endif
diff --git a/src/Message.cc b/src/Message.cc
index 8b1d081..58909bd 100644
--- a/src/Message.cc
+++ b/src/Message.cc
@@ -20,6 +20,15 @@
 #include "Message.h"
 #include "MessageId.h"
 #include <pulsar/c/message.h>
+#include <pulsar/Message.h>
+#include <pulsar/MessageBuilder.h>
+#include <pulsar/EncryptionContext.h>
+#include <map>
+
+struct _pulsar_message {
+  pulsar::MessageBuilder builder;
+  pulsar::Message message;
+};
 
 static const std::string CFG_DATA = "data";
 static const std::string CFG_PROPS = "properties";
@@ -32,6 +41,12 @@ static const std::string CFG_DELIVER_AT = "deliverAt";
 static const std::string CFG_DISABLE_REPLICATION = "disableReplication";
 static const std::string CFG_ORDERING_KEY = "orderingKey";
 
+static const std::map<pulsar::CompressionType, std::string> 
COMPRESSION_TYPE_MAP = {
+    {pulsar::CompressionNone, "None"},     {pulsar::CompressionLZ4, "LZ4"},
+    {pulsar::CompressionZLib, "Zlib"},     {pulsar::CompressionZSTD, "ZSTD"},
+    {pulsar::CompressionSNAPPY, "SNAPPY"},
+};
+
 Napi::FunctionReference Message::constructor;
 
 Napi::Object Message::Init(Napi::Env env, Napi::Object exports) {
@@ -47,7 +62,8 @@ Napi::Object Message::Init(Napi::Env env, Napi::Object 
exports) {
        InstanceMethod("getRedeliveryCount", &Message::GetRedeliveryCount),
        InstanceMethod("getPartitionKey", &Message::GetPartitionKey),
        InstanceMethod("getOrderingKey", &Message::GetOrderingKey),
-       InstanceMethod("getProducerName", &Message::GetProducerName)});
+       InstanceMethod("getProducerName", &Message::GetProducerName),
+       InstanceMethod("getEncryptionContext", 
&Message::GetEncryptionContext)});
 
   constructor = Napi::Persistent(func);
   constructor.SuppressDestruct();
@@ -156,6 +172,54 @@ Napi::Value Message::GetProducerName(const 
Napi::CallbackInfo &info) {
   return Napi::String::New(env, 
pulsar_message_get_producer_name(this->cMessage.get()));
 }
 
+Napi::Value Message::GetEncryptionContext(const Napi::CallbackInfo &info) {
+  Napi::Env env = info.Env();
+  if (!ValidateCMessage(env)) {
+    return env.Null();
+  }
+
+  auto encCtxOpt = this->cMessage.get()->message.getEncryptionContext();
+  if (!encCtxOpt) {
+    return env.Null();
+  }
+
+  // getEncryptionContext returns std::optional<const EncryptionContext*>
+  const pulsar::EncryptionContext *encCtxPtr = *encCtxOpt;
+  if (!encCtxPtr) {
+    return env.Null();
+  }
+  const pulsar::EncryptionContext &encCtx = *encCtxPtr;
+
+  Napi::Object obj = Napi::Object::New(env);
+  Napi::Array keys = Napi::Array::New(env);
+  int i = 0;
+  for (const auto &keyInfo : encCtx.keys()) {
+    Napi::Object keyObj = Napi::Object::New(env);
+    keyObj.Set("key", Napi::String::New(env, keyInfo.key));
+    keyObj.Set("value", Napi::Buffer<char>::Copy(env, keyInfo.value.c_str(), 
keyInfo.value.length()));
+
+    Napi::Object metadataObj = Napi::Object::New(env);
+    for (const auto &meta : keyInfo.metadata) {
+      metadataObj.Set(meta.first, Napi::String::New(env, meta.second));
+    }
+    keyObj.Set("metadata", metadataObj);
+
+    keys.Set(i++, keyObj);
+  }
+  obj.Set("keys", keys);
+
+  obj.Set("param", Napi::Buffer<char>::Copy(env, encCtx.param().c_str(), 
encCtx.param().length()));
+  obj.Set("algorithm", Napi::String::New(env, encCtx.algorithm()));
+  const auto it = COMPRESSION_TYPE_MAP.find(encCtx.compressionType());
+  std::string compressionTypeStr = (it != COMPRESSION_TYPE_MAP.end()) ? 
it->second : "None";
+  obj.Set("compressionType", Napi::String::New(env, compressionTypeStr));
+  obj.Set("uncompressedMessageSize", Napi::Number::New(env, 
encCtx.uncompressedMessageSize()));
+  obj.Set("batchSize", Napi::Number::New(env, encCtx.batchSize()));
+  obj.Set("isDecryptionFailed", Napi::Boolean::New(env, 
encCtx.isDecryptionFailed()));
+
+  return obj;
+}
+
 bool Message::ValidateCMessage(Napi::Env env) {
   if (this->cMessage.get()) {
     return true;
diff --git a/src/Message.h b/src/Message.h
index 417de92..4d8c4aa 100644
--- a/src/Message.h
+++ b/src/Message.h
@@ -47,6 +47,7 @@ class Message : public Napi::ObjectWrap<Message> {
   Napi::Value GetOrderingKey(const Napi::CallbackInfo &info);
   Napi::Value GetProducerName(const Napi::CallbackInfo &info);
   Napi::Value GetRedeliveryCount(const Napi::CallbackInfo &info);
+  Napi::Value GetEncryptionContext(const Napi::CallbackInfo &info);
   bool ValidateCMessage(Napi::Env env);
 
   static char **NewStringArray(int size) { return (char **)calloc(sizeof(char 
*), size); }
diff --git a/src/ProducerConfig.cc b/src/ProducerConfig.cc
index 83afb9c..eca62a7 100644
--- a/src/ProducerConfig.cc
+++ b/src/ProducerConfig.cc
@@ -18,6 +18,7 @@
  */
 #include "SchemaInfo.h"
 #include "ProducerConfig.h"
+#include "CryptoKeyReader.h"
 #include "Message.h"
 #include <cstdio>
 #include <map>
@@ -45,6 +46,8 @@ static const std::string CFG_SCHEMA = "schema";
 static const std::string CFG_PROPS = "properties";
 static const std::string CFG_PUBLIC_KEY_PATH = "publicKeyPath";
 static const std::string CFG_ENCRYPTION_KEY = "encryptionKey";
+static const std::string CFG_ENCRYPTION_KEYS = "encryptionKeys";
+static const std::string CFG_CRYPTO_KEY_READER = "cryptoKeyReader";
 static const std::string CFG_CRYPTO_FAILURE_ACTION = "cryptoFailureAction";
 static const std::string CFG_CHUNK_ENABLED = "chunkingEnabled";
 static const std::string CFG_ACCESS_MODE = "accessMode";
@@ -67,10 +70,8 @@ static const std::map<std::string, pulsar_hashing_scheme> 
HASHING_SCHEME = {
 };
 
 static std::map<std::string, pulsar_compression_type> COMPRESSION_TYPE = {
-    {"Zlib", pulsar_CompressionZLib},
-    {"LZ4", pulsar_CompressionLZ4},
-    {"ZSTD", pulsar_CompressionZSTD},
-    {"SNAPPY", pulsar_CompressionSNAPPY},
+    {"None", pulsar_CompressionNone}, {"Zlib", pulsar_CompressionZLib},     
{"LZ4", pulsar_CompressionLZ4},
+    {"ZSTD", pulsar_CompressionZSTD}, {"SNAPPY", pulsar_CompressionSNAPPY},
 };
 
 static std::map<std::string, pulsar_producer_crypto_failure_action> 
PRODUCER_CRYPTO_FAILURE_ACTION = {
@@ -239,15 +240,32 @@ ProducerConfig::ProducerConfig(const Napi::Object& 
producerConfig) : topic("") {
       std::string encryptionKey = 
producerConfig.Get(CFG_ENCRYPTION_KEY).ToString().Utf8Value();
       
pulsar_producer_configuration_set_encryption_key(this->cProducerConfig.get(), 
encryptionKey.c_str());
     }
-    if (producerConfig.Has(CFG_CRYPTO_FAILURE_ACTION) &&
-        producerConfig.Get(CFG_CRYPTO_FAILURE_ACTION).IsString()) {
-      std::string cryptoFailureAction = 
producerConfig.Get(CFG_CRYPTO_FAILURE_ACTION).ToString().Utf8Value();
-      if (PRODUCER_CRYPTO_FAILURE_ACTION.count(cryptoFailureAction))
-        pulsar_producer_configuration_set_crypto_failure_action(
-            this->cProducerConfig.get(), 
PRODUCER_CRYPTO_FAILURE_ACTION.at(cryptoFailureAction));
+  }
+
+  if (producerConfig.Has(CFG_ENCRYPTION_KEYS) && 
producerConfig.Get(CFG_ENCRYPTION_KEYS).IsArray()) {
+    Napi::Array keys = 
producerConfig.Get(CFG_ENCRYPTION_KEYS).As<Napi::Array>();
+    for (uint32_t i = 0; i < keys.Length(); i++) {
+      if (keys.Get(i).IsString()) {
+        std::string key = keys.Get(i).ToString().Utf8Value();
+        this->cProducerConfig.get()->conf.addEncryptionKey(key);
+      }
     }
   }
 
+  if (producerConfig.Has(CFG_CRYPTO_KEY_READER) && 
producerConfig.Get(CFG_CRYPTO_KEY_READER).IsObject()) {
+    Napi::Object cryptoKeyReaderObj = 
producerConfig.Get(CFG_CRYPTO_KEY_READER).As<Napi::Object>();
+    CryptoKeyReader* cryptoKeyReader = 
Napi::ObjectWrap<CryptoKeyReader>::Unwrap(cryptoKeyReaderObj);
+    
this->cProducerConfig.get()->conf.setCryptoKeyReader(cryptoKeyReader->GetCCryptoKeyReader());
+  }
+
+  if (producerConfig.Has(CFG_CRYPTO_FAILURE_ACTION) &&
+      producerConfig.Get(CFG_CRYPTO_FAILURE_ACTION).IsString()) {
+    std::string cryptoFailureAction = 
producerConfig.Get(CFG_CRYPTO_FAILURE_ACTION).ToString().Utf8Value();
+    if (PRODUCER_CRYPTO_FAILURE_ACTION.count(cryptoFailureAction))
+      pulsar_producer_configuration_set_crypto_failure_action(
+          this->cProducerConfig.get(), 
PRODUCER_CRYPTO_FAILURE_ACTION.at(cryptoFailureAction));
+  }
+
   if (producerConfig.Has(CFG_CHUNK_ENABLED) && 
producerConfig.Get(CFG_CHUNK_ENABLED).IsBoolean()) {
     bool chunkingEnabled = 
producerConfig.Get(CFG_CHUNK_ENABLED).ToBoolean().Value();
     
pulsar_producer_configuration_set_chunking_enabled(this->cProducerConfig.get(), 
chunkingEnabled);
diff --git a/src/addon.cc b/src/addon.cc
index fa26ae0..3025bce 100644
--- a/src/addon.cc
+++ b/src/addon.cc
@@ -24,6 +24,7 @@
 #include "Consumer.h"
 #include "Client.h"
 #include "Reader.h"
+#include "CryptoKeyReader.h"
 #include <napi.h>
 
 Napi::Object InitAll(Napi::Env env, Napi::Object exports) {
@@ -33,6 +34,7 @@ Napi::Object InitAll(Napi::Env env, Napi::Object exports) {
   Producer::Init(env, exports);
   Consumer::Init(env, exports);
   Reader::Init(env, exports);
+  CryptoKeyReader::Init(env, exports);
   return Client::Init(env, exports);
 }
 
diff --git a/tests/encryption.test.js b/tests/encryption.test.js
new file mode 100644
index 0000000..c6aedf8
--- /dev/null
+++ b/tests/encryption.test.js
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+const path = require('path');
+const fs = require('fs');
+const Pulsar = require('../index');
+
+class MyCryptoKeyReader extends Pulsar.CryptoKeyReader {
+  constructor(publicKeys, privateKeys) {
+    super();
+    this.publicKeys = publicKeys;
+    this.privateKeys = privateKeys;
+  }
+
+  getPublicKey(keyName, _metadata) {
+    const keyPath = this.publicKeys[keyName];
+    if (keyPath) {
+      try {
+        const key = fs.readFileSync(keyPath);
+        return { key, _metadata };
+      } catch (e) {
+        return null;
+      }
+    }
+    return null;
+  }
+
+  getPrivateKey(keyName, _metadata) {
+    const keyPath = this.privateKeys[keyName];
+    if (keyPath) {
+      try {
+        const key = fs.readFileSync(keyPath);
+        return { key, _metadata };
+      } catch (e) {
+        return null;
+      }
+    }
+    return null;
+  }
+}
+
+(() => {
+  describe('Encryption', () => {
+    let client;
+    const publicKeyPath = path.join(__dirname, 
'certificate/public-key.client-rsa.pem');
+    const privateKeyPath = path.join(__dirname, 
'certificate/private-key.client-rsa.pem');
+
+    beforeAll(() => {
+      client = new Pulsar.Client({
+        serviceUrl: 'pulsar://localhost:6650',
+        operationTimeoutSeconds: 30,
+      });
+    });
+
+    afterAll(async () => {
+      await client.close();
+    });
+
+    test('End-to-End Encryption', async () => {
+      const topic = 
`persistent://public/default/test-encryption-${Date.now()}`;
+
+      const cryptoKeyReader = new MyCryptoKeyReader(
+        { 'my-key': publicKeyPath },
+        { 'my-key': privateKeyPath },
+      );
+
+      const producer = await client.createProducer({
+        topic,
+        encryptionKeys: ['my-key'],
+        cryptoKeyReader,
+        cryptoFailureAction: 'FAIL',
+      });
+
+      const consumer = await client.subscribe({
+        topic,
+        subscription: 'sub-encryption',
+        cryptoKeyReader,
+        cryptoFailureAction: 'CONSUME',
+        subscriptionInitialPosition: 'Earliest',
+      });
+
+      const msgContent = 'my-secret-message';
+      await producer.send({
+        data: Buffer.from(msgContent),
+      });
+
+      const msg = await consumer.receive();
+      expect(msg.getData().toString()).toBe(msgContent);
+      const encCtx = msg.getEncryptionContext();
+      expect(encCtx).not.toBeNull();
+      expect(encCtx.isDecryptionFailed).toBe(false);
+      expect(encCtx.keys).toBeDefined();
+      expect(encCtx.keys.length).toBeGreaterThan(0);
+      expect(encCtx.keys[0].value).toBeInstanceOf(Buffer);
+      expect(encCtx.param).toBeInstanceOf(Buffer);
+      expect(encCtx.algorithm).toBe('');
+      expect(encCtx.compressionType).toBe('None');
+      expect(encCtx.uncompressedMessageSize).toBe(0);
+      expect(encCtx.batchSize).toBe(1);
+
+      await consumer.acknowledge(msg);
+      await producer.close();
+      await consumer.close();
+    });
+
+    test('End-to-End Encryption with Batching and Compression', async () => {
+      const topic = 
`persistent://public/default/test-encryption-batch-compress-${Date.now()}`;
+
+      const cryptoKeyReader = new MyCryptoKeyReader(
+        { 'my-key': publicKeyPath },
+        { 'my-key': privateKeyPath },
+      );
+
+      const producer = await client.createProducer({
+        topic,
+        encryptionKeys: ['my-key'],
+        cryptoKeyReader,
+        cryptoFailureAction: 'FAIL',
+        batchingEnabled: true,
+        batchingMaxMessages: 10,
+        batchingMaxPublishDelayMs: 100,
+        compressionType: 'Zlib',
+      });
+
+      const consumer = await client.subscribe({
+        topic,
+        subscription: 'sub-encryption-batch-compress',
+        cryptoKeyReader,
+        cryptoFailureAction: 'CONSUME',
+        subscriptionInitialPosition: 'Earliest',
+      });
+
+      const numMessages = 10;
+      const sendPromises = [];
+      for (let i = 0; i < numMessages; i += 1) {
+        sendPromises.push(producer.send({
+          data: Buffer.from(`message-${i}`),
+        }));
+      }
+      await Promise.all(sendPromises);
+
+      for (let i = 0; i < numMessages; i += 1) {
+        const msg = await consumer.receive();
+        expect(msg.getData().toString()).toBe(`message-${i}`);
+        const encCtx = msg.getEncryptionContext();
+        expect(encCtx).not.toBeNull();
+        expect(encCtx.isDecryptionFailed).toBe(false);
+        expect(encCtx.keys).toBeDefined();
+        expect(encCtx.keys.length).toBeGreaterThan(0);
+        expect(encCtx.keys[0].value).toBeInstanceOf(Buffer);
+        expect(encCtx.param).toBeInstanceOf(Buffer);
+        expect(encCtx.algorithm).toBe('');
+        expect(encCtx.compressionType).toBe('Zlib');
+        expect(encCtx.uncompressedMessageSize).toBeGreaterThan(0);
+        expect(encCtx.batchSize).toBe(numMessages);
+
+        await consumer.acknowledge(msg);
+      }
+
+      await producer.close();
+      await consumer.close();
+    });
+
+    test('Decryption Failure', async () => {
+      const topic = 
`persistent://public/default/test-decryption-failure-${Date.now()}`;
+
+      const cryptoKeyReader = new MyCryptoKeyReader(
+        { 'my-key': publicKeyPath },
+        { 'my-key': privateKeyPath },
+      );
+
+      const producer = await client.createProducer({
+        topic,
+        encryptionKeys: ['my-key'],
+        cryptoKeyReader,
+        cryptoFailureAction: 'FAIL',
+      });
+
+      const consumer = await client.subscribe({
+        topic,
+        subscription: 'sub-decryption-failure',
+        cryptoFailureAction: 'CONSUME',
+        subscriptionInitialPosition: 'Earliest',
+      });
+
+      const msgContent = 'my-secret-message';
+      await producer.send({
+        data: Buffer.from(msgContent),
+      });
+
+      const msg = await consumer.receive();
+      expect(msg.getData().toString()).not.toBe(msgContent);
+
+      const encCtx = msg.getEncryptionContext();
+      expect(encCtx).not.toBeNull();
+      expect(encCtx.isDecryptionFailed).toBe(true);
+      expect(encCtx.keys).toBeDefined();
+      expect(encCtx.keys.length).toBeGreaterThan(0);
+      expect(encCtx.keys[0].value).toBeInstanceOf(Buffer);
+      expect(encCtx.param).toBeInstanceOf(Buffer);
+
+      await consumer.acknowledge(msg);
+      await producer.close();
+      await consumer.close();
+    });
+  });
+})();

Reply via email to