This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 8cbd222e27aa5ff0c47dac18634b0dedb7ebc5b6
Author: Guangning E <[email protected]>
AuthorDate: Fri Feb 19 07:02:11 2021 +0800

    [feature][python-client]support python end to end encryption (#9588)
    
    * Support python end to end encryption
    
    * Add test
    
    * Add document for new args
    
    * Fixed test by use absolute path
    
    (cherry picked from commit cf63ae8480e6b03aca437b658cc10a935129a819)
---
 pulsar-client-cpp/include/pulsar/CryptoKeyReader.h |  4 ++-
 pulsar-client-cpp/lib/CryptoKeyReader.cc           |  5 +++
 pulsar-client-cpp/python/CMakeLists.txt            | 16 ++++++---
 pulsar-client-cpp/python/pulsar/__init__.py        | 39 +++++++++++++++++++++-
 pulsar-client-cpp/python/pulsar_test.py            | 22 +++++++++++-
 pulsar-client-cpp/python/src/config.cc             | 17 ++++++++++
 .../python/src/{utils.h => cryptoKeyReader.cc}     | 31 +++++------------
 pulsar-client-cpp/python/src/pulsar.cc             |  2 ++
 pulsar-client-cpp/python/src/utils.h               |  7 ++++
 9 files changed, 114 insertions(+), 29 deletions(-)

diff --git a/pulsar-client-cpp/include/pulsar/CryptoKeyReader.h 
b/pulsar-client-cpp/include/pulsar/CryptoKeyReader.h
index 0d81d94..6b371f0 100644
--- a/pulsar-client-cpp/include/pulsar/CryptoKeyReader.h
+++ b/pulsar-client-cpp/include/pulsar/CryptoKeyReader.h
@@ -62,6 +62,8 @@ class PULSAR_PUBLIC CryptoKeyReader {
 
 }; /* namespace pulsar */
 
+typedef std::shared_ptr<CryptoKeyReader> CryptoKeyReaderPtr;
+
 class PULSAR_PUBLIC DefaultCryptoKeyReader : public CryptoKeyReader {
    private:
     std::string publicKeyPath_;
@@ -76,9 +78,9 @@ class PULSAR_PUBLIC DefaultCryptoKeyReader : public 
CryptoKeyReader {
                         EncryptionKeyInfo& encKeyInfo) const;
     Result getPrivateKey(const std::string& keyName, std::map<std::string, 
std::string>& metadata,
                          EncryptionKeyInfo& encKeyInfo) const;
+    static CryptoKeyReaderPtr create(const std::string& publicKeyPath, const 
std::string& privateKeyPath);
 }; /* namespace pulsar */
 
-typedef std::shared_ptr<CryptoKeyReader> CryptoKeyReaderPtr;
 }  // namespace pulsar
 
 #endif /* CRYPTOKEYREADER_H_ */
diff --git a/pulsar-client-cpp/lib/CryptoKeyReader.cc 
b/pulsar-client-cpp/lib/CryptoKeyReader.cc
index 7a5d9ee..1eb73e8 100644
--- a/pulsar-client-cpp/lib/CryptoKeyReader.cc
+++ b/pulsar-client-cpp/lib/CryptoKeyReader.cc
@@ -72,4 +72,9 @@ Result DefaultCryptoKeyReader::getPrivateKey(const 
std::string& keyName,
 
     encKeyInfo.setKey(keyContents);
     return ResultOk;
+}
+
+CryptoKeyReaderPtr DefaultCryptoKeyReader::create(const std::string& 
publicKeyPath,
+                                                  const std::string& 
privateKeyPath) {
+    return CryptoKeyReaderPtr(new DefaultCryptoKeyReader(publicKeyPath, 
privateKeyPath));
 }
\ No newline at end of file
diff --git a/pulsar-client-cpp/python/CMakeLists.txt 
b/pulsar-client-cpp/python/CMakeLists.txt
index 70b5bc1..83bc63b 100644
--- a/pulsar-client-cpp/python/CMakeLists.txt
+++ b/pulsar-client-cpp/python/CMakeLists.txt
@@ -19,10 +19,18 @@
 
 INCLUDE_DIRECTORIES("${Boost_INCLUDE_DIRS}" "${PYTHON_INCLUDE_DIRS}")
 
-ADD_LIBRARY(_pulsar SHARED src/pulsar.cc src/producer.cc src/consumer.cc
-                            src/config.cc src/enums.cc src/client.cc
-                            src/message.cc src/authentication.cc
-                            src/reader.cc src/schema.cc)
+ADD_LIBRARY(_pulsar SHARED src/pulsar.cc
+                           src/producer.cc
+                           src/consumer.cc
+                           src/config.cc
+                           src/enums.cc
+                           src/client.cc
+                           src/message.cc
+                           src/authentication.cc
+                           src/reader.cc
+                           src/schema.cc
+                           src/cryptoKeyReader.cc)
+
 SET(CMAKE_SHARED_LIBRARY_PREFIX )
 SET(CMAKE_SHARED_LIBRARY_SUFFIX .so)
 
diff --git a/pulsar-client-cpp/python/pulsar/__init__.py 
b/pulsar-client-cpp/python/pulsar/__init__.py
index cfae7e0..b47c87d 100644
--- a/pulsar-client-cpp/python/pulsar/__init__.py
+++ b/pulsar-client-cpp/python/pulsar/__init__.py
@@ -452,6 +452,8 @@ class Client:
                         
message_routing_mode=PartitionsRoutingMode.RoundRobinDistribution,
                         properties=None,
                         batching_type=BatchingType.Default,
+                        encryption_key=None,
+                        crypto_key_reader=None
                         ):
         """
         Create a new producer on a given topic.
@@ -519,6 +521,11 @@ class Client:
             (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, 
v3), (k2, v3), (k3, v3)
             batched into single batch message:
             [(k1, v1), (k1, v2), (k1, v3)], [(k2, v1), (k2, v2), (k2, v3)], 
[(k3, v1), (k3, v2), (k3, v3)]
+        * encryption_key:
+           The key used for symmetric encryption, configured on the producer 
side
+        * crypto_key_reader:
+           Symmetric encryption class implementation, configuring public key 
encryption messages for the producer
+           and private key decryption messages for the consumer
         """
         _check_type(str, topic, 'topic')
         _check_type_or_none(str, producer_name, 'producer_name')
@@ -535,6 +542,8 @@ class Client:
         _check_type(int, batching_max_publish_delay_ms, 
'batching_max_publish_delay_ms')
         _check_type_or_none(dict, properties, 'properties')
         _check_type(BatchingType, batching_type, 'batching_type')
+        _check_type_or_none(str, encryption_key, 'encryption_key')
+        _check_type_or_none(CryptoKeyReader, crypto_key_reader, 
'crypto_key_reader')
 
         conf = _pulsar.ProducerConfiguration()
         conf.send_timeout_millis(send_timeout_millis)
@@ -557,6 +566,10 @@ class Client:
                 conf.property(k, v)
 
         conf.schema(schema.schema_info())
+        if encryption_key:
+            conf.encryption_key(encryption_key)
+        if crypto_key_reader:
+            conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
 
         p = Producer()
         p._producer = self._client.create_producer(topic, conf)
@@ -576,7 +589,8 @@ class Client:
                   is_read_compacted=False,
                   properties=None,
                   pattern_auto_discovery_period=60,
-                  initial_position=InitialPosition.Latest
+                  initial_position=InitialPosition.Latest,
+                  crypto_key_reader=None
                   ):
         """
         Subscribe to the given topic and subscription combination.
@@ -649,6 +663,9 @@ class Client:
           Set the initial position of a consumer  when subscribing to the 
topic.
           It could be either: `InitialPosition.Earliest` or 
`InitialPosition.Latest`.
           Default: `Latest`.
+        * crypto_key_reader:
+           Symmetric encryption class implementation, configuring public key 
encryption messages for the producer
+           and private key decryption messages for the consumer
         """
         _check_type(str, subscription_name, 'subscription_name')
         _check_type(ConsumerType, consumer_type, 'consumer_type')
@@ -664,6 +681,7 @@ class Client:
         _check_type(bool, is_read_compacted, 'is_read_compacted')
         _check_type_or_none(dict, properties, 'properties')
         _check_type(InitialPosition, initial_position, 'initial_position')
+        _check_type_or_none(CryptoKeyReader, crypto_key_reader, 
'crypto_key_reader')
 
         conf = _pulsar.ConsumerConfiguration()
         conf.consumer_type(consumer_type)
@@ -686,6 +704,9 @@ class Client:
 
         conf.schema(schema.schema_info())
 
+        if crypto_key_reader:
+            conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
+
         c = Consumer()
         if isinstance(topic, str):
             # Single topic
@@ -1224,6 +1245,22 @@ class Reader:
         self._reader.close()
         self._client._consumers.remove(self)
 
+class CryptoKeyReader:
+    """
+    Default crypto key reader implementation
+    """
+    def __init__(self, public_key_path, private_key_path):
+        """
+        Create crypto key reader.
+
+        **Args**
+
+        * `public_key_path`: Path to the public key
+        * `private_key_path`: Path to private key
+        """
+        _check_type(str, public_key_path, 'public_key_path')
+        _check_type(str, private_key_path, 'private_key_path')
+        self.cryptoKeyReader = _pulsar.CryptoKeyReader(public_key_path, 
private_key_path)
 
 def _check_type(var_type, var, name):
     if not isinstance(var, var_type):
diff --git a/pulsar-client-cpp/python/pulsar_test.py 
b/pulsar-client-cpp/python/pulsar_test.py
index f056832..e7d05f3 100755
--- a/pulsar-client-cpp/python/pulsar_test.py
+++ b/pulsar-client-cpp/python/pulsar_test.py
@@ -26,7 +26,8 @@ import uuid
 from datetime import timedelta
 from pulsar import Client, MessageId, \
             CompressionType, ConsumerType, PartitionsRoutingMode, \
-            AuthenticationTLS, Authentication, AuthenticationToken, 
InitialPosition
+            AuthenticationTLS, Authentication, AuthenticationToken, 
InitialPosition, \
+            CryptoKeyReader
 
 from _pulsar import ProducerConfiguration, ConsumerConfiguration
 
@@ -357,6 +358,25 @@ class PulsarTest(TestCase):
 
         client.close()
 
+    def test_encryption(self):
+        publicKeyPath = 
"/pulsar//pulsar-broker/src/test/resources/certificate/public-key.client-rsa.pem"
+        privateKeyPath = 
"/pulsar/pulsar-broker/src/test/resources/certificate/private-key.client-rsa.pem"
+        crypto_key_reader = CryptoKeyReader(publicKeyPath, privateKeyPath)
+        client = Client(self.serviceUrl)
+        topic = 'my-python-test-end-to-end-encryption'
+        consumer = client.subscribe(topic=topic,
+                                    subscription_name='my-subscription',
+                                    crypto_key_reader=crypto_key_reader)
+        producer = client.create_producer(topic=topic,
+                                          encryption_key="client-rsa.pem",
+                                          crypto_key_reader=crypto_key_reader)
+        producer.send('hello')
+        msg = consumer.receive(TM)
+        self.assertTrue(msg)
+        self.assertEqual(msg.value(), 'hello')
+        consumer.unsubscribe()
+        client.close()
+
     def test_tls_auth3(self):
         certs_dir = 
'/pulsar/pulsar-broker/src/test/resources/authentication/tls/'
         if not os.path.exists(certs_dir):
diff --git a/pulsar-client-cpp/python/src/config.cc 
b/pulsar-client-cpp/python/src/config.cc
index 9aadf92..188aaf5 100644
--- a/pulsar-client-cpp/python/src/config.cc
+++ b/pulsar-client-cpp/python/src/config.cc
@@ -74,6 +74,20 @@ static ClientConfiguration& 
ClientConfiguration_setAuthentication(ClientConfigur
     return conf;
 }
 
+static ConsumerConfiguration& 
ConsumerConfiguration_setCryptoKeyReader(ConsumerConfiguration& conf,
+                                                                        
py::object cryptoKeyReader) {
+    CryptoKeyReaderWrapper cryptoKeyReaderWrapper = 
py::extract<CryptoKeyReaderWrapper>(cryptoKeyReader);
+    conf.setCryptoKeyReader(cryptoKeyReaderWrapper.cryptoKeyReader);
+    return conf;
+}
+
+static ProducerConfiguration& 
ProducerConfiguration_setCryptoKeyReader(ProducerConfiguration& conf,
+                                                                        
py::object cryptoKeyReader) {
+    CryptoKeyReaderWrapper cryptoKeyReaderWrapper = 
py::extract<CryptoKeyReaderWrapper>(cryptoKeyReader);
+    conf.setCryptoKeyReader(cryptoKeyReaderWrapper.cryptoKeyReader);
+    return conf;
+}
+
 void export_config() {
     using namespace boost::python;
 
@@ -128,6 +142,8 @@ void export_config() {
             .def("property", &ProducerConfiguration::setProperty, 
return_self<>())
             .def("batching_type", &ProducerConfiguration::setBatchingType, 
return_self<>())
             .def("batching_type", &ProducerConfiguration::getBatchingType)
+            .def("encryption_key", &ProducerConfiguration::addEncryptionKey, 
return_self<>())
+            .def("crypto_key_reader", 
&ProducerConfiguration_setCryptoKeyReader, return_self<>())
             ;
 
     class_<ConsumerConfiguration>("ConsumerConfiguration")
@@ -155,6 +171,7 @@ void export_config() {
             .def("property", &ConsumerConfiguration::setProperty, 
return_self<>())
             .def("subscription_initial_position", 
&ConsumerConfiguration::getSubscriptionInitialPosition)
             .def("subscription_initial_position", 
&ConsumerConfiguration::setSubscriptionInitialPosition)
+            .def("crypto_key_reader", 
&ConsumerConfiguration_setCryptoKeyReader, return_self<>())
             ;
 
     class_<ReaderConfiguration>("ReaderConfiguration")
diff --git a/pulsar-client-cpp/python/src/utils.h 
b/pulsar-client-cpp/python/src/cryptoKeyReader.cc
similarity index 59%
copy from pulsar-client-cpp/python/src/utils.h
copy to pulsar-client-cpp/python/src/cryptoKeyReader.cc
index 8471d03..ccefe6f 100644
--- a/pulsar-client-cpp/python/src/utils.h
+++ b/pulsar-client-cpp/python/src/cryptoKeyReader.cc
@@ -16,30 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include <boost/python.hpp>
+#include "utils.h"
 
-#include <pulsar/Client.h>
-#include <pulsar/MessageBatch.h>
+CryptoKeyReaderWrapper::CryptoKeyReaderWrapper() {}
 
-using namespace pulsar;
-
-namespace py = boost::python;
-
-struct PulsarException {
-    Result _result;
-    PulsarException(Result res) :
-            _result(res) {}
-};
-
-inline void CHECK_RESULT(Result res) {
-    if (res != ResultOk) {
-        throw PulsarException(res);
-    }
+CryptoKeyReaderWrapper::CryptoKeyReaderWrapper(const std::string& 
publicKeyPath,
+                                             const std::string& 
privateKeyPath) {
+    this->cryptoKeyReader = DefaultCryptoKeyReader::create(publicKeyPath, 
privateKeyPath);
 }
 
-struct AuthenticationWrapper {
-    AuthenticationPtr auth;
+void export_cryptoKeyReader() {
+    using namespace boost::python;
 
-    AuthenticationWrapper();
-    AuthenticationWrapper(const std::string& dynamicLibPath, const 
std::string& authParamsString);
-};
+    class_<CryptoKeyReaderWrapper>("CryptoKeyReader", init<const std::string&, 
const std::string&>());
+}
\ No newline at end of file
diff --git a/pulsar-client-cpp/python/src/pulsar.cc 
b/pulsar-client-cpp/python/src/pulsar.cc
index b26a252..f80c9a4 100644
--- a/pulsar-client-cpp/python/src/pulsar.cc
+++ b/pulsar-client-cpp/python/src/pulsar.cc
@@ -27,6 +27,7 @@ void export_config();
 void export_enums();
 void export_authentication();
 void export_schema();
+void export_cryptoKeyReader();
 
 
 static void translateException(const PulsarException& ex) {
@@ -53,4 +54,5 @@ BOOST_PYTHON_MODULE(_pulsar)
     export_enums();
     export_authentication();
     export_schema();
+    export_cryptoKeyReader();
 }
diff --git a/pulsar-client-cpp/python/src/utils.h 
b/pulsar-client-cpp/python/src/utils.h
index 8471d03..457d1f8 100644
--- a/pulsar-client-cpp/python/src/utils.h
+++ b/pulsar-client-cpp/python/src/utils.h
@@ -43,3 +43,10 @@ struct AuthenticationWrapper {
     AuthenticationWrapper();
     AuthenticationWrapper(const std::string& dynamicLibPath, const 
std::string& authParamsString);
 };
+
+struct CryptoKeyReaderWrapper {
+    CryptoKeyReaderPtr cryptoKeyReader;
+
+    CryptoKeyReaderWrapper();
+    CryptoKeyReaderWrapper(const std::string& publicKeyPath, const 
std::string& privateKeyPath);
+};

Reply via email to