This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 8cbd222e27aa5ff0c47dac18634b0dedb7ebc5b6 Author: Guangning E <[email protected]> AuthorDate: Fri Feb 19 07:02:11 2021 +0800 [feature][python-client]support python end to end encryption (#9588) * Support python end to end encryption * Add test * Add document for new args * Fixed test by use absolute path (cherry picked from commit cf63ae8480e6b03aca437b658cc10a935129a819) --- pulsar-client-cpp/include/pulsar/CryptoKeyReader.h | 4 ++- pulsar-client-cpp/lib/CryptoKeyReader.cc | 5 +++ pulsar-client-cpp/python/CMakeLists.txt | 16 ++++++--- pulsar-client-cpp/python/pulsar/__init__.py | 39 +++++++++++++++++++++- pulsar-client-cpp/python/pulsar_test.py | 22 +++++++++++- pulsar-client-cpp/python/src/config.cc | 17 ++++++++++ .../python/src/{utils.h => cryptoKeyReader.cc} | 31 +++++------------ pulsar-client-cpp/python/src/pulsar.cc | 2 ++ pulsar-client-cpp/python/src/utils.h | 7 ++++ 9 files changed, 114 insertions(+), 29 deletions(-) diff --git a/pulsar-client-cpp/include/pulsar/CryptoKeyReader.h b/pulsar-client-cpp/include/pulsar/CryptoKeyReader.h index 0d81d94..6b371f0 100644 --- a/pulsar-client-cpp/include/pulsar/CryptoKeyReader.h +++ b/pulsar-client-cpp/include/pulsar/CryptoKeyReader.h @@ -62,6 +62,8 @@ class PULSAR_PUBLIC CryptoKeyReader { }; /* namespace pulsar */ +typedef std::shared_ptr<CryptoKeyReader> CryptoKeyReaderPtr; + class PULSAR_PUBLIC DefaultCryptoKeyReader : public CryptoKeyReader { private: std::string publicKeyPath_; @@ -76,9 +78,9 @@ class PULSAR_PUBLIC DefaultCryptoKeyReader : public CryptoKeyReader { EncryptionKeyInfo& encKeyInfo) const; Result getPrivateKey(const std::string& keyName, std::map<std::string, std::string>& metadata, EncryptionKeyInfo& encKeyInfo) const; + static CryptoKeyReaderPtr create(const std::string& publicKeyPath, const std::string& privateKeyPath); }; /* namespace pulsar */ -typedef std::shared_ptr<CryptoKeyReader> CryptoKeyReaderPtr; } // namespace pulsar #endif /* CRYPTOKEYREADER_H_ */ diff --git a/pulsar-client-cpp/lib/CryptoKeyReader.cc b/pulsar-client-cpp/lib/CryptoKeyReader.cc index 7a5d9ee..1eb73e8 100644 --- a/pulsar-client-cpp/lib/CryptoKeyReader.cc +++ b/pulsar-client-cpp/lib/CryptoKeyReader.cc @@ -72,4 +72,9 @@ Result DefaultCryptoKeyReader::getPrivateKey(const std::string& keyName, encKeyInfo.setKey(keyContents); return ResultOk; +} + +CryptoKeyReaderPtr DefaultCryptoKeyReader::create(const std::string& publicKeyPath, + const std::string& privateKeyPath) { + return CryptoKeyReaderPtr(new DefaultCryptoKeyReader(publicKeyPath, privateKeyPath)); } \ No newline at end of file diff --git a/pulsar-client-cpp/python/CMakeLists.txt b/pulsar-client-cpp/python/CMakeLists.txt index 70b5bc1..83bc63b 100644 --- a/pulsar-client-cpp/python/CMakeLists.txt +++ b/pulsar-client-cpp/python/CMakeLists.txt @@ -19,10 +19,18 @@ INCLUDE_DIRECTORIES("${Boost_INCLUDE_DIRS}" "${PYTHON_INCLUDE_DIRS}") -ADD_LIBRARY(_pulsar SHARED src/pulsar.cc src/producer.cc src/consumer.cc - src/config.cc src/enums.cc src/client.cc - src/message.cc src/authentication.cc - src/reader.cc src/schema.cc) +ADD_LIBRARY(_pulsar SHARED src/pulsar.cc + src/producer.cc + src/consumer.cc + src/config.cc + src/enums.cc + src/client.cc + src/message.cc + src/authentication.cc + src/reader.cc + src/schema.cc + src/cryptoKeyReader.cc) + SET(CMAKE_SHARED_LIBRARY_PREFIX ) SET(CMAKE_SHARED_LIBRARY_SUFFIX .so) diff --git a/pulsar-client-cpp/python/pulsar/__init__.py b/pulsar-client-cpp/python/pulsar/__init__.py index cfae7e0..b47c87d 100644 --- a/pulsar-client-cpp/python/pulsar/__init__.py +++ b/pulsar-client-cpp/python/pulsar/__init__.py @@ -452,6 +452,8 @@ class Client: message_routing_mode=PartitionsRoutingMode.RoundRobinDistribution, properties=None, batching_type=BatchingType.Default, + encryption_key=None, + crypto_key_reader=None ): """ Create a new producer on a given topic. @@ -519,6 +521,11 @@ class Client: (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3) batched into single batch message: [(k1, v1), (k1, v2), (k1, v3)], [(k2, v1), (k2, v2), (k2, v3)], [(k3, v1), (k3, v2), (k3, v3)] + * encryption_key: + The key used for symmetric encryption, configured on the producer side + * crypto_key_reader: + Symmetric encryption class implementation, configuring public key encryption messages for the producer + and private key decryption messages for the consumer """ _check_type(str, topic, 'topic') _check_type_or_none(str, producer_name, 'producer_name') @@ -535,6 +542,8 @@ class Client: _check_type(int, batching_max_publish_delay_ms, 'batching_max_publish_delay_ms') _check_type_or_none(dict, properties, 'properties') _check_type(BatchingType, batching_type, 'batching_type') + _check_type_or_none(str, encryption_key, 'encryption_key') + _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader') conf = _pulsar.ProducerConfiguration() conf.send_timeout_millis(send_timeout_millis) @@ -557,6 +566,10 @@ class Client: conf.property(k, v) conf.schema(schema.schema_info()) + if encryption_key: + conf.encryption_key(encryption_key) + if crypto_key_reader: + conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader) p = Producer() p._producer = self._client.create_producer(topic, conf) @@ -576,7 +589,8 @@ class Client: is_read_compacted=False, properties=None, pattern_auto_discovery_period=60, - initial_position=InitialPosition.Latest + initial_position=InitialPosition.Latest, + crypto_key_reader=None ): """ Subscribe to the given topic and subscription combination. @@ -649,6 +663,9 @@ class Client: Set the initial position of a consumer when subscribing to the topic. It could be either: `InitialPosition.Earliest` or `InitialPosition.Latest`. Default: `Latest`. + * crypto_key_reader: + Symmetric encryption class implementation, configuring public key encryption messages for the producer + and private key decryption messages for the consumer """ _check_type(str, subscription_name, 'subscription_name') _check_type(ConsumerType, consumer_type, 'consumer_type') @@ -664,6 +681,7 @@ class Client: _check_type(bool, is_read_compacted, 'is_read_compacted') _check_type_or_none(dict, properties, 'properties') _check_type(InitialPosition, initial_position, 'initial_position') + _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader') conf = _pulsar.ConsumerConfiguration() conf.consumer_type(consumer_type) @@ -686,6 +704,9 @@ class Client: conf.schema(schema.schema_info()) + if crypto_key_reader: + conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader) + c = Consumer() if isinstance(topic, str): # Single topic @@ -1224,6 +1245,22 @@ class Reader: self._reader.close() self._client._consumers.remove(self) +class CryptoKeyReader: + """ + Default crypto key reader implementation + """ + def __init__(self, public_key_path, private_key_path): + """ + Create crypto key reader. + + **Args** + + * `public_key_path`: Path to the public key + * `private_key_path`: Path to private key + """ + _check_type(str, public_key_path, 'public_key_path') + _check_type(str, private_key_path, 'private_key_path') + self.cryptoKeyReader = _pulsar.CryptoKeyReader(public_key_path, private_key_path) def _check_type(var_type, var, name): if not isinstance(var, var_type): diff --git a/pulsar-client-cpp/python/pulsar_test.py b/pulsar-client-cpp/python/pulsar_test.py index f056832..e7d05f3 100755 --- a/pulsar-client-cpp/python/pulsar_test.py +++ b/pulsar-client-cpp/python/pulsar_test.py @@ -26,7 +26,8 @@ import uuid from datetime import timedelta from pulsar import Client, MessageId, \ CompressionType, ConsumerType, PartitionsRoutingMode, \ - AuthenticationTLS, Authentication, AuthenticationToken, InitialPosition + AuthenticationTLS, Authentication, AuthenticationToken, InitialPosition, \ + CryptoKeyReader from _pulsar import ProducerConfiguration, ConsumerConfiguration @@ -357,6 +358,25 @@ class PulsarTest(TestCase): client.close() + def test_encryption(self): + publicKeyPath = "/pulsar//pulsar-broker/src/test/resources/certificate/public-key.client-rsa.pem" + privateKeyPath = "/pulsar/pulsar-broker/src/test/resources/certificate/private-key.client-rsa.pem" + crypto_key_reader = CryptoKeyReader(publicKeyPath, privateKeyPath) + client = Client(self.serviceUrl) + topic = 'my-python-test-end-to-end-encryption' + consumer = client.subscribe(topic=topic, + subscription_name='my-subscription', + crypto_key_reader=crypto_key_reader) + producer = client.create_producer(topic=topic, + encryption_key="client-rsa.pem", + crypto_key_reader=crypto_key_reader) + producer.send('hello') + msg = consumer.receive(TM) + self.assertTrue(msg) + self.assertEqual(msg.value(), 'hello') + consumer.unsubscribe() + client.close() + def test_tls_auth3(self): certs_dir = '/pulsar/pulsar-broker/src/test/resources/authentication/tls/' if not os.path.exists(certs_dir): diff --git a/pulsar-client-cpp/python/src/config.cc b/pulsar-client-cpp/python/src/config.cc index 9aadf92..188aaf5 100644 --- a/pulsar-client-cpp/python/src/config.cc +++ b/pulsar-client-cpp/python/src/config.cc @@ -74,6 +74,20 @@ static ClientConfiguration& ClientConfiguration_setAuthentication(ClientConfigur return conf; } +static ConsumerConfiguration& ConsumerConfiguration_setCryptoKeyReader(ConsumerConfiguration& conf, + py::object cryptoKeyReader) { + CryptoKeyReaderWrapper cryptoKeyReaderWrapper = py::extract<CryptoKeyReaderWrapper>(cryptoKeyReader); + conf.setCryptoKeyReader(cryptoKeyReaderWrapper.cryptoKeyReader); + return conf; +} + +static ProducerConfiguration& ProducerConfiguration_setCryptoKeyReader(ProducerConfiguration& conf, + py::object cryptoKeyReader) { + CryptoKeyReaderWrapper cryptoKeyReaderWrapper = py::extract<CryptoKeyReaderWrapper>(cryptoKeyReader); + conf.setCryptoKeyReader(cryptoKeyReaderWrapper.cryptoKeyReader); + return conf; +} + void export_config() { using namespace boost::python; @@ -128,6 +142,8 @@ void export_config() { .def("property", &ProducerConfiguration::setProperty, return_self<>()) .def("batching_type", &ProducerConfiguration::setBatchingType, return_self<>()) .def("batching_type", &ProducerConfiguration::getBatchingType) + .def("encryption_key", &ProducerConfiguration::addEncryptionKey, return_self<>()) + .def("crypto_key_reader", &ProducerConfiguration_setCryptoKeyReader, return_self<>()) ; class_<ConsumerConfiguration>("ConsumerConfiguration") @@ -155,6 +171,7 @@ void export_config() { .def("property", &ConsumerConfiguration::setProperty, return_self<>()) .def("subscription_initial_position", &ConsumerConfiguration::getSubscriptionInitialPosition) .def("subscription_initial_position", &ConsumerConfiguration::setSubscriptionInitialPosition) + .def("crypto_key_reader", &ConsumerConfiguration_setCryptoKeyReader, return_self<>()) ; class_<ReaderConfiguration>("ReaderConfiguration") diff --git a/pulsar-client-cpp/python/src/utils.h b/pulsar-client-cpp/python/src/cryptoKeyReader.cc similarity index 59% copy from pulsar-client-cpp/python/src/utils.h copy to pulsar-client-cpp/python/src/cryptoKeyReader.cc index 8471d03..ccefe6f 100644 --- a/pulsar-client-cpp/python/src/utils.h +++ b/pulsar-client-cpp/python/src/cryptoKeyReader.cc @@ -16,30 +16,17 @@ * specific language governing permissions and limitations * under the License. */ -#include <boost/python.hpp> +#include "utils.h" -#include <pulsar/Client.h> -#include <pulsar/MessageBatch.h> +CryptoKeyReaderWrapper::CryptoKeyReaderWrapper() {} -using namespace pulsar; - -namespace py = boost::python; - -struct PulsarException { - Result _result; - PulsarException(Result res) : - _result(res) {} -}; - -inline void CHECK_RESULT(Result res) { - if (res != ResultOk) { - throw PulsarException(res); - } +CryptoKeyReaderWrapper::CryptoKeyReaderWrapper(const std::string& publicKeyPath, + const std::string& privateKeyPath) { + this->cryptoKeyReader = DefaultCryptoKeyReader::create(publicKeyPath, privateKeyPath); } -struct AuthenticationWrapper { - AuthenticationPtr auth; +void export_cryptoKeyReader() { + using namespace boost::python; - AuthenticationWrapper(); - AuthenticationWrapper(const std::string& dynamicLibPath, const std::string& authParamsString); -}; + class_<CryptoKeyReaderWrapper>("CryptoKeyReader", init<const std::string&, const std::string&>()); +} \ No newline at end of file diff --git a/pulsar-client-cpp/python/src/pulsar.cc b/pulsar-client-cpp/python/src/pulsar.cc index b26a252..f80c9a4 100644 --- a/pulsar-client-cpp/python/src/pulsar.cc +++ b/pulsar-client-cpp/python/src/pulsar.cc @@ -27,6 +27,7 @@ void export_config(); void export_enums(); void export_authentication(); void export_schema(); +void export_cryptoKeyReader(); static void translateException(const PulsarException& ex) { @@ -53,4 +54,5 @@ BOOST_PYTHON_MODULE(_pulsar) export_enums(); export_authentication(); export_schema(); + export_cryptoKeyReader(); } diff --git a/pulsar-client-cpp/python/src/utils.h b/pulsar-client-cpp/python/src/utils.h index 8471d03..457d1f8 100644 --- a/pulsar-client-cpp/python/src/utils.h +++ b/pulsar-client-cpp/python/src/utils.h @@ -43,3 +43,10 @@ struct AuthenticationWrapper { AuthenticationWrapper(); AuthenticationWrapper(const std::string& dynamicLibPath, const std::string& authParamsString); }; + +struct CryptoKeyReaderWrapper { + CryptoKeyReaderPtr cryptoKeyReader; + + CryptoKeyReaderWrapper(); + CryptoKeyReaderWrapper(const std::string& publicKeyPath, const std::string& privateKeyPath); +};
