This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f1aeb06c04daf87d2e5b4dcaa4fa1fa6f5d67b7f
Author: Yunze Xu <[email protected]>
AuthorDate: Tue Dec 21 23:51:16 2021 +0800

    Apply clang-format check for python wrapper (#13418)
    
    (cherry picked from commit 636d5b4c2133bcdf8c1988bbe82eb1064565d5dc)
---
 pulsar-client-cpp/CMakeLists.txt                |   6 +-
 pulsar-client-cpp/python/src/authentication.cc  |  44 ++--
 pulsar-client-cpp/python/src/client.cc          |  63 +++---
 pulsar-client-cpp/python/src/config.cc          | 256 ++++++++++++------------
 pulsar-client-cpp/python/src/consumer.cc        |  81 ++++----
 pulsar-client-cpp/python/src/cryptoKeyReader.cc |   2 +-
 pulsar-client-cpp/python/src/enums.cc           | 162 +++++++--------
 pulsar-client-cpp/python/src/exceptions.cc      |  49 +++--
 pulsar-client-cpp/python/src/message.cc         | 147 ++++++--------
 pulsar-client-cpp/python/src/producer.cc        |  71 ++++---
 pulsar-client-cpp/python/src/pulsar.cc          |   4 +-
 pulsar-client-cpp/python/src/reader.cc          |  48 ++---
 pulsar-client-cpp/python/src/schema.cc          |  10 +-
 pulsar-client-cpp/python/src/utils.h            |   3 +-
 14 files changed, 441 insertions(+), 505 deletions(-)

diff --git a/pulsar-client-cpp/CMakeLists.txt b/pulsar-client-cpp/CMakeLists.txt
index 8ba9569..3fadb05 100644
--- a/pulsar-client-cpp/CMakeLists.txt
+++ b/pulsar-client-cpp/CMakeLists.txt
@@ -446,7 +446,8 @@ add_custom_target(format python 
${BUILD_SUPPORT_DIR}/run_clang_format.py
         ${CMAKE_SOURCE_DIR}/perf
         ${CMAKE_SOURCE_DIR}/examples
         ${CMAKE_SOURCE_DIR}/tests
-        ${CMAKE_SOURCE_DIR}/include)
+        ${CMAKE_SOURCE_DIR}/include
+        ${CMAKE_SOURCE_DIR}/python/src)
 
 # `make check-format` option (for CI test)
 add_custom_target(check-format python ${BUILD_SUPPORT_DIR}/run_clang_format.py
@@ -457,4 +458,5 @@ add_custom_target(check-format python 
${BUILD_SUPPORT_DIR}/run_clang_format.py
         ${CMAKE_SOURCE_DIR}/perf
         ${CMAKE_SOURCE_DIR}/examples
         ${CMAKE_SOURCE_DIR}/tests
-        ${CMAKE_SOURCE_DIR}/include)
+        ${CMAKE_SOURCE_DIR}/include
+        ${CMAKE_SOURCE_DIR}/python/src)
diff --git a/pulsar-client-cpp/python/src/authentication.cc 
b/pulsar-client-cpp/python/src/authentication.cc
index e236c7e..920a717 100644
--- a/pulsar-client-cpp/python/src/authentication.cc
+++ b/pulsar-client-cpp/python/src/authentication.cc
@@ -26,8 +26,8 @@ AuthenticationWrapper::AuthenticationWrapper(const 
std::string& dynamicLibPath,
 }
 
 struct AuthenticationTlsWrapper : public AuthenticationWrapper {
-    AuthenticationTlsWrapper(const std::string& certificatePath, const 
std::string& privateKeyPath) :
-            AuthenticationWrapper() {
+    AuthenticationTlsWrapper(const std::string& certificatePath, const 
std::string& privateKeyPath)
+        : AuthenticationWrapper() {
         this->auth = AuthTls::create(certificatePath, privateKeyPath);
     }
 };
@@ -35,13 +35,10 @@ struct AuthenticationTlsWrapper : public 
AuthenticationWrapper {
 struct TokenSupplierWrapper {
     PyObject* _pySupplier;
 
-    TokenSupplierWrapper(py::object pySupplier) :
-        _pySupplier(pySupplier.ptr()) {
-        Py_XINCREF(_pySupplier);
-    }
+    TokenSupplierWrapper(py::object pySupplier) : 
_pySupplier(pySupplier.ptr()) { Py_XINCREF(_pySupplier); }
 
     TokenSupplierWrapper(const TokenSupplierWrapper& other) {
-        _pySupplier= other._pySupplier;
+        _pySupplier = other._pySupplier;
         Py_XINCREF(_pySupplier);
     }
 
@@ -51,9 +48,7 @@ struct TokenSupplierWrapper {
         return *this;
     }
 
-    virtual ~TokenSupplierWrapper() {
-        Py_XDECREF(_pySupplier);
-    }
+    virtual ~TokenSupplierWrapper() { Py_XDECREF(_pySupplier); }
 
     std::string operator()() {
         PyGILState_STATE state = PyGILState_Ensure();
@@ -61,7 +56,7 @@ struct TokenSupplierWrapper {
         std::string token;
         try {
             token = py::call<std::string>(_pySupplier);
-        } catch(const py::error_already_set& e) {
+        } catch (const py::error_already_set& e) {
             PyErr_Print();
         }
 
@@ -70,10 +65,8 @@ struct TokenSupplierWrapper {
     }
 };
 
-
 struct AuthenticationTokenWrapper : public AuthenticationWrapper {
-    AuthenticationTokenWrapper(py::object token) :
-            AuthenticationWrapper() {
+    AuthenticationTokenWrapper(py::object token) : AuthenticationWrapper() {
         if (py::extract<std::string>(token).check()) {
             // It's a string
             std::string tokenStr = py::extract<std::string>(token);
@@ -86,15 +79,13 @@ struct AuthenticationTokenWrapper : public 
AuthenticationWrapper {
 };
 
 struct AuthenticationAthenzWrapper : public AuthenticationWrapper {
-    AuthenticationAthenzWrapper(const std::string& authParamsString) :
-            AuthenticationWrapper() {
+    AuthenticationAthenzWrapper(const std::string& authParamsString) : 
AuthenticationWrapper() {
         this->auth = AuthAthenz::create(authParamsString);
     }
 };
 
 struct AuthenticationOauth2Wrapper : public AuthenticationWrapper {
-    AuthenticationOauth2Wrapper(const std::string& authParamsString) :
-            AuthenticationWrapper() {
+    AuthenticationOauth2Wrapper(const std::string& authParamsString) : 
AuthenticationWrapper() {
         this->auth = AuthOauth2::create(authParamsString);
     }
 };
@@ -102,22 +93,17 @@ struct AuthenticationOauth2Wrapper : public 
AuthenticationWrapper {
 void export_authentication() {
     using namespace boost::python;
 
-    class_<AuthenticationWrapper>("Authentication", init<const std::string&, 
const std::string&>())
-            ;
+    class_<AuthenticationWrapper>("Authentication", init<const std::string&, 
const std::string&>());
 
-    class_<AuthenticationTlsWrapper, bases<AuthenticationWrapper> 
>("AuthenticationTLS",
-                                                                    init<const 
std::string&, const std::string&>())
-            ;
+    class_<AuthenticationTlsWrapper, bases<AuthenticationWrapper> >(
+        "AuthenticationTLS", init<const std::string&, const std::string&>());
 
     class_<AuthenticationTokenWrapper, bases<AuthenticationWrapper> 
>("AuthenticationToken",
-                                                                    
init<py::object>())
-            ;
+                                                                      
init<py::object>());
 
     class_<AuthenticationAthenzWrapper, bases<AuthenticationWrapper> 
>("AuthenticationAthenz",
-                                                                       
init<const std::string&>())
-            ;
+                                                                       
init<const std::string&>());
 
     class_<AuthenticationOauth2Wrapper, bases<AuthenticationWrapper> 
>("AuthenticationOauth2",
-                                                                       
init<const std::string&>())
-            ;
+                                                                       
init<const std::string&>());
 }
diff --git a/pulsar-client-cpp/python/src/client.cc 
b/pulsar-client-cpp/python/src/client.cc
index 3dcbf7f..445a6dc 100644
--- a/pulsar-client-cpp/python/src/client.cc
+++ b/pulsar-client-cpp/python/src/client.cc
@@ -22,11 +22,10 @@ Producer Client_createProducer(Client& client, const 
std::string& topic, const P
     Producer producer;
     Result res;
 
-    Py_BEGIN_ALLOW_THREADS
-    res = client.createProducer(topic, conf, producer);
+    Py_BEGIN_ALLOW_THREADS res = client.createProducer(topic, conf, producer);
     Py_END_ALLOW_THREADS
 
-    CHECK_RESULT(res);
+        CHECK_RESULT(res);
     return producer;
 }
 
@@ -35,11 +34,10 @@ Consumer Client_subscribe(Client& client, const 
std::string& topic, const std::s
     Consumer consumer;
     Result res;
 
-    Py_BEGIN_ALLOW_THREADS
-    res = client.subscribe(topic, subscriptionName, conf, consumer);
+    Py_BEGIN_ALLOW_THREADS res = client.subscribe(topic, subscriptionName, 
conf, consumer);
     Py_END_ALLOW_THREADS
 
-    CHECK_RESULT(res);
+        CHECK_RESULT(res);
     return consumer;
 }
 
@@ -50,43 +48,39 @@ Consumer Client_subscribe_topics(Client& client, 
boost::python::list& topics,
 
     std::vector<std::string> topics_vector;
 
-    for (int i = 0; i < len(topics); i ++) {
+    for (int i = 0; i < len(topics); i++) {
         std::string content = boost::python::extract<std::string>(topics[i]);
         topics_vector.push_back(content);
     }
 
-    Py_BEGIN_ALLOW_THREADS
-        res = client.subscribe(topics_vector, subscriptionName, conf, 
consumer);
+    Py_BEGIN_ALLOW_THREADS res = client.subscribe(topics_vector, 
subscriptionName, conf, consumer);
     Py_END_ALLOW_THREADS
 
-    CHECK_RESULT(res);
+        CHECK_RESULT(res);
     return consumer;
 }
 
-Consumer Client_subscribe_pattern(Client& client, const std::string& 
topic_pattern, const std::string& subscriptionName,
-                                 const ConsumerConfiguration& conf) {
+Consumer Client_subscribe_pattern(Client& client, const std::string& 
topic_pattern,
+                                  const std::string& subscriptionName, const 
ConsumerConfiguration& conf) {
     Consumer consumer;
     Result res;
 
-    Py_BEGIN_ALLOW_THREADS
-        res = client.subscribeWithRegex(topic_pattern, subscriptionName, conf, 
consumer);
+    Py_BEGIN_ALLOW_THREADS res = client.subscribeWithRegex(topic_pattern, 
subscriptionName, conf, consumer);
     Py_END_ALLOW_THREADS
 
-    CHECK_RESULT(res);
+        CHECK_RESULT(res);
     return consumer;
 }
 
-Reader Client_createReader(Client& client, const std::string& topic,
-                           const MessageId& startMessageId,
+Reader Client_createReader(Client& client, const std::string& topic, const 
MessageId& startMessageId,
                            const ReaderConfiguration& conf) {
     Reader reader;
     Result res;
 
-    Py_BEGIN_ALLOW_THREADS
-    res = client.createReader(topic, startMessageId, conf, reader);
+    Py_BEGIN_ALLOW_THREADS res = client.createReader(topic, startMessageId, 
conf, reader);
     Py_END_ALLOW_THREADS
 
-    CHECK_RESULT(res);
+        CHECK_RESULT(res);
     return reader;
 }
 
@@ -94,11 +88,10 @@ boost::python::list Client_getTopicPartitions(Client& 
client, const std::string&
     std::vector<std::string> partitions;
     Result res;
 
-    Py_BEGIN_ALLOW_THREADS
-    res = client.getPartitionsForTopic(topic, partitions);
+    Py_BEGIN_ALLOW_THREADS res = client.getPartitionsForTopic(topic, 
partitions);
     Py_END_ALLOW_THREADS
 
-    CHECK_RESULT(res);
+        CHECK_RESULT(res);
 
     boost::python::list pyList;
     for (int i = 0; i < partitions.size(); i++) {
@@ -111,24 +104,22 @@ boost::python::list Client_getTopicPartitions(Client& 
client, const std::string&
 void Client_close(Client& client) {
     Result res;
 
-    Py_BEGIN_ALLOW_THREADS
-    res = client.close();
+    Py_BEGIN_ALLOW_THREADS res = client.close();
     Py_END_ALLOW_THREADS
 
-    CHECK_RESULT(res);
+        CHECK_RESULT(res);
 }
 
 void export_client() {
     using namespace boost::python;
 
-    class_<Client>("Client", init<const std::string&, const 
ClientConfiguration& >())
-            .def("create_producer", &Client_createProducer)
-            .def("subscribe", &Client_subscribe)
-            .def("subscribe_topics", &Client_subscribe_topics)
-            .def("subscribe_pattern", &Client_subscribe_pattern)
-            .def("create_reader", &Client_createReader)
-            .def("get_topic_partitions", &Client_getTopicPartitions)
-            .def("close", &Client_close)
-            .def("shutdown", &Client::shutdown)
-            ;
+    class_<Client>("Client", init<const std::string&, const 
ClientConfiguration&>())
+        .def("create_producer", &Client_createProducer)
+        .def("subscribe", &Client_subscribe)
+        .def("subscribe_topics", &Client_subscribe_topics)
+        .def("subscribe_pattern", &Client_subscribe_pattern)
+        .def("create_reader", &Client_createReader)
+        .def("get_topic_partitions", &Client_getTopicPartitions)
+        .def("close", &Client_close)
+        .def("shutdown", &Client::shutdown);
 }
diff --git a/pulsar-client-cpp/python/src/config.cc 
b/pulsar-client-cpp/python/src/config.cc
index a287248..2dee1a1 100644
--- a/pulsar-client-cpp/python/src/config.cc
+++ b/pulsar-client-cpp/python/src/config.cc
@@ -21,14 +21,11 @@
 #include "lib/Utils.h"
 #include <memory>
 
-template<typename T>
+template <typename T>
 struct ListenerWrapper {
     PyObject* _pyListener;
 
-    ListenerWrapper(py::object pyListener) :
-        _pyListener(pyListener.ptr()) {
-        Py_XINCREF(_pyListener);
-    }
+    ListenerWrapper(py::object pyListener) : _pyListener(pyListener.ptr()) { 
Py_XINCREF(_pyListener); }
 
     ListenerWrapper(const ListenerWrapper& other) {
         _pyListener = other._pyListener;
@@ -41,9 +38,7 @@ struct ListenerWrapper {
         return *this;
     }
 
-    virtual ~ListenerWrapper() {
-        Py_XDECREF(_pyListener);
-    }
+    virtual ~ListenerWrapper() { Py_XDECREF(_pyListener); }
 
     void operator()(T consumer, const Message& msg) {
         PyGILState_STATE state = PyGILState_Ensure();
@@ -65,7 +60,7 @@ static ConsumerConfiguration& 
ConsumerConfiguration_setMessageListener(ConsumerC
 }
 
 static ReaderConfiguration& 
ReaderConfiguration_setReaderListener(ReaderConfiguration& conf,
-                                                                   py::object 
pyListener) {
+                                                                  py::object 
pyListener) {
     conf.setReaderListener(ListenerWrapper<Reader>(pyListener));
     return conf;
 }
@@ -78,41 +73,36 @@ static ClientConfiguration& 
ClientConfiguration_setAuthentication(ClientConfigur
 }
 
 static ConsumerConfiguration& 
ConsumerConfiguration_setCryptoKeyReader(ConsumerConfiguration& conf,
-                                                                        
py::object cryptoKeyReader) {
+                                                                       
py::object cryptoKeyReader) {
     CryptoKeyReaderWrapper cryptoKeyReaderWrapper = 
py::extract<CryptoKeyReaderWrapper>(cryptoKeyReader);
     conf.setCryptoKeyReader(cryptoKeyReaderWrapper.cryptoKeyReader);
     return conf;
 }
 
 static ProducerConfiguration& 
ProducerConfiguration_setCryptoKeyReader(ProducerConfiguration& conf,
-                                                                        
py::object cryptoKeyReader) {
+                                                                       
py::object cryptoKeyReader) {
     CryptoKeyReaderWrapper cryptoKeyReaderWrapper = 
py::extract<CryptoKeyReaderWrapper>(cryptoKeyReader);
     conf.setCryptoKeyReader(cryptoKeyReaderWrapper.cryptoKeyReader);
     return conf;
 }
 
 static ReaderConfiguration& 
ReaderConfiguration_setCryptoKeyReader(ReaderConfiguration& conf,
-                                                                        
py::object cryptoKeyReader) {
+                                                                   py::object 
cryptoKeyReader) {
     CryptoKeyReaderWrapper cryptoKeyReaderWrapper = 
py::extract<CryptoKeyReaderWrapper>(cryptoKeyReader);
     conf.setCryptoKeyReader(cryptoKeyReaderWrapper.cryptoKeyReader);
     return conf;
 }
 
-class LoggerWrapper: public Logger {
+class LoggerWrapper : public Logger {
     PyObject* const _pyLogger;
     const int _pythonLogLevel;
     const std::unique_ptr<Logger> _fallbackLogger;
 
-    static constexpr int _getLogLevelValue(Level level) {
-        return 10 + (level * 10);
-    }
+    static constexpr int _getLogLevelValue(Level level) { return 10 + (level * 
10); }
 
    public:
-
     LoggerWrapper(PyObject* pyLogger, int pythonLogLevel, Logger* 
fallbackLogger)
-        : _pyLogger(pyLogger),
-          _pythonLogLevel(pythonLogLevel),
-          _fallbackLogger(fallbackLogger) {
+        : _pyLogger(pyLogger), _pythonLogLevel(pythonLogLevel), 
_fallbackLogger(fallbackLogger) {
         Py_XINCREF(_pyLogger);
     }
 
@@ -121,13 +111,9 @@ class LoggerWrapper: public Logger {
     LoggerWrapper& operator=(const LoggerWrapper&) = delete;
     LoggerWrapper& operator=(LoggerWrapper&&) = delete;
 
-    virtual ~LoggerWrapper() {
-        Py_XDECREF(_pyLogger);
-    }
+    virtual ~LoggerWrapper() { Py_XDECREF(_pyLogger); }
 
-    bool isEnabled(Level level) {
-        return _getLogLevelValue(level) >= _pythonLogLevel;
-    }
+    bool isEnabled(Level level) { return _getLogLevelValue(level) >= 
_pythonLogLevel; }
 
     void log(Level level, int line, const std::string& message) {
         if (!Py_IsInitialized()) {
@@ -187,11 +173,9 @@ class LoggerWrapperFactory : public LoggerFactory {
         initializePythonLogLevel();
     }
 
-    virtual ~LoggerWrapperFactory() {
-        Py_XDECREF(_pyLogger);
-    }
+    virtual ~LoggerWrapperFactory() { Py_XDECREF(_pyLogger); }
 
-    Logger* getLogger(const std::string &fileName) {
+    Logger* getLogger(const std::string& fileName) {
         const auto fallbackLogger = 
_fallbackLoggerFactory->getLogger(fileName);
         if (_pythonLogLevel.is_present()) {
             return new LoggerWrapper(_pyLogger, _pythonLogLevel.value(), 
fallbackLogger);
@@ -206,112 +190,128 @@ static ClientConfiguration& 
ClientConfiguration_setLogger(ClientConfiguration& c
     return conf;
 }
 
-
 void export_config() {
     using namespace boost::python;
 
     class_<ClientConfiguration>("ClientConfiguration")
-            .def("authentication", &ClientConfiguration_setAuthentication, 
return_self<>())
-            .def("operation_timeout_seconds", 
&ClientConfiguration::getOperationTimeoutSeconds)
-            .def("operation_timeout_seconds", 
&ClientConfiguration::setOperationTimeoutSeconds, return_self<>())
-            .def("connection_timeout", 
&ClientConfiguration::getConnectionTimeout)
-            .def("connection_timeout", 
&ClientConfiguration::setConnectionTimeout, return_self<>())
-            .def("io_threads", &ClientConfiguration::getIOThreads)
-            .def("io_threads", &ClientConfiguration::setIOThreads, 
return_self<>())
-            .def("message_listener_threads", 
&ClientConfiguration::getMessageListenerThreads)
-            .def("message_listener_threads", 
&ClientConfiguration::setMessageListenerThreads, return_self<>())
-            .def("concurrent_lookup_requests", 
&ClientConfiguration::getConcurrentLookupRequest)
-            .def("concurrent_lookup_requests", 
&ClientConfiguration::setConcurrentLookupRequest, return_self<>())
-            .def("log_conf_file_path", 
&ClientConfiguration::getLogConfFilePath, 
return_value_policy<copy_const_reference>())
-            .def("log_conf_file_path", 
&ClientConfiguration::setLogConfFilePath, return_self<>())
-            .def("use_tls", &ClientConfiguration::isUseTls)
-            .def("use_tls", &ClientConfiguration::setUseTls, return_self<>())
-            .def("tls_trust_certs_file_path", 
&ClientConfiguration::getTlsTrustCertsFilePath, 
return_value_policy<copy_const_reference>())
-            .def("tls_trust_certs_file_path", 
&ClientConfiguration::setTlsTrustCertsFilePath, return_self<>())
-            .def("tls_allow_insecure_connection", 
&ClientConfiguration::isTlsAllowInsecureConnection)
-            .def("tls_allow_insecure_connection", 
&ClientConfiguration::setTlsAllowInsecureConnection, return_self<>())
-            .def("tls_validate_hostname", 
&ClientConfiguration::setValidateHostName, return_self<>())
-            .def("set_logger", &ClientConfiguration_setLogger, return_self<>())
-            ;
+        .def("authentication", &ClientConfiguration_setAuthentication, 
return_self<>())
+        .def("operation_timeout_seconds", 
&ClientConfiguration::getOperationTimeoutSeconds)
+        .def("operation_timeout_seconds", 
&ClientConfiguration::setOperationTimeoutSeconds, return_self<>())
+        .def("connection_timeout", &ClientConfiguration::getConnectionTimeout)
+        .def("connection_timeout", &ClientConfiguration::setConnectionTimeout, 
return_self<>())
+        .def("io_threads", &ClientConfiguration::getIOThreads)
+        .def("io_threads", &ClientConfiguration::setIOThreads, return_self<>())
+        .def("message_listener_threads", 
&ClientConfiguration::getMessageListenerThreads)
+        .def("message_listener_threads", 
&ClientConfiguration::setMessageListenerThreads, return_self<>())
+        .def("concurrent_lookup_requests", 
&ClientConfiguration::getConcurrentLookupRequest)
+        .def("concurrent_lookup_requests", 
&ClientConfiguration::setConcurrentLookupRequest, return_self<>())
+        .def("log_conf_file_path", &ClientConfiguration::getLogConfFilePath,
+             return_value_policy<copy_const_reference>())
+        .def("log_conf_file_path", &ClientConfiguration::setLogConfFilePath, 
return_self<>())
+        .def("use_tls", &ClientConfiguration::isUseTls)
+        .def("use_tls", &ClientConfiguration::setUseTls, return_self<>())
+        .def("tls_trust_certs_file_path", 
&ClientConfiguration::getTlsTrustCertsFilePath,
+             return_value_policy<copy_const_reference>())
+        .def("tls_trust_certs_file_path", 
&ClientConfiguration::setTlsTrustCertsFilePath, return_self<>())
+        .def("tls_allow_insecure_connection", 
&ClientConfiguration::isTlsAllowInsecureConnection)
+        .def("tls_allow_insecure_connection", 
&ClientConfiguration::setTlsAllowInsecureConnection,
+             return_self<>())
+        .def("tls_validate_hostname", 
&ClientConfiguration::setValidateHostName, return_self<>())
+        .def("set_logger", &ClientConfiguration_setLogger, return_self<>());
 
     class_<ProducerConfiguration>("ProducerConfiguration")
-            .def("producer_name", &ProducerConfiguration::getProducerName, 
return_value_policy<copy_const_reference>())
-            .def("producer_name", &ProducerConfiguration::setProducerName, 
return_self<>())
-            .def("schema", &ProducerConfiguration::getSchema, 
return_value_policy<copy_const_reference>())
-            .def("schema", &ProducerConfiguration::setSchema, return_self<>())
-            .def("send_timeout_millis", &ProducerConfiguration::getSendTimeout)
-            .def("send_timeout_millis", 
&ProducerConfiguration::setSendTimeout, return_self<>())
-            .def("initial_sequence_id", 
&ProducerConfiguration::getInitialSequenceId)
-            .def("initial_sequence_id", 
&ProducerConfiguration::setInitialSequenceId, return_self<>())
-            .def("compression_type", 
&ProducerConfiguration::getCompressionType)
-            .def("compression_type", 
&ProducerConfiguration::setCompressionType, return_self<>())
-            .def("max_pending_messages", 
&ProducerConfiguration::getMaxPendingMessages)
-            .def("max_pending_messages", 
&ProducerConfiguration::setMaxPendingMessages, return_self<>())
-            .def("max_pending_messages_across_partitions", 
&ProducerConfiguration::getMaxPendingMessagesAcrossPartitions)
-            .def("max_pending_messages_across_partitions", 
&ProducerConfiguration::setMaxPendingMessagesAcrossPartitions, return_self<>())
-            .def("block_if_queue_full", 
&ProducerConfiguration::getBlockIfQueueFull)
-            .def("block_if_queue_full", 
&ProducerConfiguration::setBlockIfQueueFull, return_self<>())
-            .def("partitions_routing_mode", 
&ProducerConfiguration::getPartitionsRoutingMode)
-            .def("partitions_routing_mode", 
&ProducerConfiguration::setPartitionsRoutingMode, return_self<>())
-            .def("lazy_start_partitioned_producers", 
&ProducerConfiguration::getLazyStartPartitionedProducers)
-            .def("lazy_start_partitioned_producers", 
&ProducerConfiguration::setLazyStartPartitionedProducers, return_self<>())
-            .def("batching_enabled", 
&ProducerConfiguration::getBatchingEnabled, 
return_value_policy<copy_const_reference>())
-            .def("batching_enabled", 
&ProducerConfiguration::setBatchingEnabled, return_self<>())
-            .def("batching_max_messages", 
&ProducerConfiguration::getBatchingMaxMessages, 
return_value_policy<copy_const_reference>())
-            .def("batching_max_messages", 
&ProducerConfiguration::setBatchingMaxMessages, return_self<>())
-            .def("batching_max_allowed_size_in_bytes", 
&ProducerConfiguration::getBatchingMaxAllowedSizeInBytes, 
return_value_policy<copy_const_reference>())
-            .def("batching_max_allowed_size_in_bytes", 
&ProducerConfiguration::setBatchingMaxAllowedSizeInBytes, return_self<>())
-            .def("batching_max_publish_delay_ms", 
&ProducerConfiguration::getBatchingMaxPublishDelayMs, 
return_value_policy<copy_const_reference>())
-            .def("batching_max_publish_delay_ms", 
&ProducerConfiguration::setBatchingMaxPublishDelayMs, return_self<>())
-            .def("property", &ProducerConfiguration::setProperty, 
return_self<>())
-            .def("batching_type", &ProducerConfiguration::setBatchingType, 
return_self<>())
-            .def("batching_type", &ProducerConfiguration::getBatchingType)
-            .def("encryption_key", &ProducerConfiguration::addEncryptionKey, 
return_self<>())
-            .def("crypto_key_reader", 
&ProducerConfiguration_setCryptoKeyReader, return_self<>())
-            ;
+        .def("producer_name", &ProducerConfiguration::getProducerName,
+             return_value_policy<copy_const_reference>())
+        .def("producer_name", &ProducerConfiguration::setProducerName, 
return_self<>())
+        .def("schema", &ProducerConfiguration::getSchema, 
return_value_policy<copy_const_reference>())
+        .def("schema", &ProducerConfiguration::setSchema, return_self<>())
+        .def("send_timeout_millis", &ProducerConfiguration::getSendTimeout)
+        .def("send_timeout_millis", &ProducerConfiguration::setSendTimeout, 
return_self<>())
+        .def("initial_sequence_id", 
&ProducerConfiguration::getInitialSequenceId)
+        .def("initial_sequence_id", 
&ProducerConfiguration::setInitialSequenceId, return_self<>())
+        .def("compression_type", &ProducerConfiguration::getCompressionType)
+        .def("compression_type", &ProducerConfiguration::setCompressionType, 
return_self<>())
+        .def("max_pending_messages", 
&ProducerConfiguration::getMaxPendingMessages)
+        .def("max_pending_messages", 
&ProducerConfiguration::setMaxPendingMessages, return_self<>())
+        .def("max_pending_messages_across_partitions",
+             &ProducerConfiguration::getMaxPendingMessagesAcrossPartitions)
+        .def("max_pending_messages_across_partitions",
+             &ProducerConfiguration::setMaxPendingMessagesAcrossPartitions, 
return_self<>())
+        .def("block_if_queue_full", 
&ProducerConfiguration::getBlockIfQueueFull)
+        .def("block_if_queue_full", 
&ProducerConfiguration::setBlockIfQueueFull, return_self<>())
+        .def("partitions_routing_mode", 
&ProducerConfiguration::getPartitionsRoutingMode)
+        .def("partitions_routing_mode", 
&ProducerConfiguration::setPartitionsRoutingMode, return_self<>())
+        .def("lazy_start_partitioned_producers", 
&ProducerConfiguration::getLazyStartPartitionedProducers)
+        .def("lazy_start_partitioned_producers", 
&ProducerConfiguration::setLazyStartPartitionedProducers,
+             return_self<>())
+        .def("batching_enabled", &ProducerConfiguration::getBatchingEnabled,
+             return_value_policy<copy_const_reference>())
+        .def("batching_enabled", &ProducerConfiguration::setBatchingEnabled, 
return_self<>())
+        .def("batching_max_messages", 
&ProducerConfiguration::getBatchingMaxMessages,
+             return_value_policy<copy_const_reference>())
+        .def("batching_max_messages", 
&ProducerConfiguration::setBatchingMaxMessages, return_self<>())
+        .def("batching_max_allowed_size_in_bytes", 
&ProducerConfiguration::getBatchingMaxAllowedSizeInBytes,
+             return_value_policy<copy_const_reference>())
+        .def("batching_max_allowed_size_in_bytes", 
&ProducerConfiguration::setBatchingMaxAllowedSizeInBytes,
+             return_self<>())
+        .def("batching_max_publish_delay_ms", 
&ProducerConfiguration::getBatchingMaxPublishDelayMs,
+             return_value_policy<copy_const_reference>())
+        .def("batching_max_publish_delay_ms", 
&ProducerConfiguration::setBatchingMaxPublishDelayMs,
+             return_self<>())
+        .def("property", &ProducerConfiguration::setProperty, return_self<>())
+        .def("batching_type", &ProducerConfiguration::setBatchingType, 
return_self<>())
+        .def("batching_type", &ProducerConfiguration::getBatchingType)
+        .def("encryption_key", &ProducerConfiguration::addEncryptionKey, 
return_self<>())
+        .def("crypto_key_reader", &ProducerConfiguration_setCryptoKeyReader, 
return_self<>());
 
     class_<ConsumerConfiguration>("ConsumerConfiguration")
-            .def("consumer_type", &ConsumerConfiguration::getConsumerType)
-            .def("consumer_type", &ConsumerConfiguration::setConsumerType, 
return_self<>())
-            .def("schema", &ConsumerConfiguration::getSchema, 
return_value_policy<copy_const_reference>())
-            .def("schema", &ConsumerConfiguration::setSchema, return_self<>())
-            .def("message_listener", 
&ConsumerConfiguration_setMessageListener, return_self<>())
-            .def("receiver_queue_size", 
&ConsumerConfiguration::getReceiverQueueSize)
-            .def("receiver_queue_size", 
&ConsumerConfiguration::setReceiverQueueSize)
-            .def("max_total_receiver_queue_size_across_partitions", 
&ConsumerConfiguration::getMaxTotalReceiverQueueSizeAcrossPartitions)
-            .def("max_total_receiver_queue_size_across_partitions", 
&ConsumerConfiguration::setMaxTotalReceiverQueueSizeAcrossPartitions)
-            .def("consumer_name", &ConsumerConfiguration::getConsumerName, 
return_value_policy<copy_const_reference>())
-            .def("consumer_name", &ConsumerConfiguration::setConsumerName)
-            .def("unacked_messages_timeout_ms", 
&ConsumerConfiguration::getUnAckedMessagesTimeoutMs)
-            .def("unacked_messages_timeout_ms", 
&ConsumerConfiguration::setUnAckedMessagesTimeoutMs)
-            .def("negative_ack_redelivery_delay_ms", 
&ConsumerConfiguration::getNegativeAckRedeliveryDelayMs)
-            .def("negative_ack_redelivery_delay_ms", 
&ConsumerConfiguration::setNegativeAckRedeliveryDelayMs)
-            .def("broker_consumer_stats_cache_time_ms", 
&ConsumerConfiguration::getBrokerConsumerStatsCacheTimeInMs)
-            .def("broker_consumer_stats_cache_time_ms", 
&ConsumerConfiguration::setBrokerConsumerStatsCacheTimeInMs)
-            .def("pattern_auto_discovery_period", 
&ConsumerConfiguration::getPatternAutoDiscoveryPeriod)
-            .def("pattern_auto_discovery_period", 
&ConsumerConfiguration::setPatternAutoDiscoveryPeriod)
-            .def("read_compacted", &ConsumerConfiguration::isReadCompacted)
-            .def("read_compacted", &ConsumerConfiguration::setReadCompacted)
-            .def("property", &ConsumerConfiguration::setProperty, 
return_self<>())
-            .def("subscription_initial_position", 
&ConsumerConfiguration::getSubscriptionInitialPosition)
-            .def("subscription_initial_position", 
&ConsumerConfiguration::setSubscriptionInitialPosition)
-            .def("crypto_key_reader", 
&ConsumerConfiguration_setCryptoKeyReader, return_self<>())
-            .def("replicate_subscription_state_enabled", 
&ConsumerConfiguration::setReplicateSubscriptionStateEnabled)
-            .def("replicate_subscription_state_enabled", 
&ConsumerConfiguration::isReplicateSubscriptionStateEnabled)
-            ;
+        .def("consumer_type", &ConsumerConfiguration::getConsumerType)
+        .def("consumer_type", &ConsumerConfiguration::setConsumerType, 
return_self<>())
+        .def("schema", &ConsumerConfiguration::getSchema, 
return_value_policy<copy_const_reference>())
+        .def("schema", &ConsumerConfiguration::setSchema, return_self<>())
+        .def("message_listener", &ConsumerConfiguration_setMessageListener, 
return_self<>())
+        .def("receiver_queue_size", 
&ConsumerConfiguration::getReceiverQueueSize)
+        .def("receiver_queue_size", 
&ConsumerConfiguration::setReceiverQueueSize)
+        .def("max_total_receiver_queue_size_across_partitions",
+             
&ConsumerConfiguration::getMaxTotalReceiverQueueSizeAcrossPartitions)
+        .def("max_total_receiver_queue_size_across_partitions",
+             
&ConsumerConfiguration::setMaxTotalReceiverQueueSizeAcrossPartitions)
+        .def("consumer_name", &ConsumerConfiguration::getConsumerName,
+             return_value_policy<copy_const_reference>())
+        .def("consumer_name", &ConsumerConfiguration::setConsumerName)
+        .def("unacked_messages_timeout_ms", 
&ConsumerConfiguration::getUnAckedMessagesTimeoutMs)
+        .def("unacked_messages_timeout_ms", 
&ConsumerConfiguration::setUnAckedMessagesTimeoutMs)
+        .def("negative_ack_redelivery_delay_ms", 
&ConsumerConfiguration::getNegativeAckRedeliveryDelayMs)
+        .def("negative_ack_redelivery_delay_ms", 
&ConsumerConfiguration::setNegativeAckRedeliveryDelayMs)
+        .def("broker_consumer_stats_cache_time_ms",
+             &ConsumerConfiguration::getBrokerConsumerStatsCacheTimeInMs)
+        .def("broker_consumer_stats_cache_time_ms",
+             &ConsumerConfiguration::setBrokerConsumerStatsCacheTimeInMs)
+        .def("pattern_auto_discovery_period", 
&ConsumerConfiguration::getPatternAutoDiscoveryPeriod)
+        .def("pattern_auto_discovery_period", 
&ConsumerConfiguration::setPatternAutoDiscoveryPeriod)
+        .def("read_compacted", &ConsumerConfiguration::isReadCompacted)
+        .def("read_compacted", &ConsumerConfiguration::setReadCompacted)
+        .def("property", &ConsumerConfiguration::setProperty, return_self<>())
+        .def("subscription_initial_position", 
&ConsumerConfiguration::getSubscriptionInitialPosition)
+        .def("subscription_initial_position", 
&ConsumerConfiguration::setSubscriptionInitialPosition)
+        .def("crypto_key_reader", &ConsumerConfiguration_setCryptoKeyReader, 
return_self<>())
+        .def("replicate_subscription_state_enabled",
+             &ConsumerConfiguration::setReplicateSubscriptionStateEnabled)
+        .def("replicate_subscription_state_enabled",
+             &ConsumerConfiguration::isReplicateSubscriptionStateEnabled);
 
     class_<ReaderConfiguration>("ReaderConfiguration")
-            .def("reader_listener", &ReaderConfiguration_setReaderListener, 
return_self<>())
-            .def("schema", &ReaderConfiguration::getSchema, 
return_value_policy<copy_const_reference>())
-            .def("schema", &ReaderConfiguration::setSchema, return_self<>())
-            .def("receiver_queue_size", 
&ReaderConfiguration::getReceiverQueueSize)
-            .def("receiver_queue_size", 
&ReaderConfiguration::setReceiverQueueSize)
-            .def("reader_name", &ReaderConfiguration::getReaderName, 
return_value_policy<copy_const_reference>())
-            .def("reader_name", &ReaderConfiguration::setReaderName)
-            .def("subscription_role_prefix", 
&ReaderConfiguration::getSubscriptionRolePrefix, 
return_value_policy<copy_const_reference>())
-            .def("subscription_role_prefix", 
&ReaderConfiguration::setSubscriptionRolePrefix)
-            .def("read_compacted", &ReaderConfiguration::isReadCompacted)
-            .def("read_compacted", &ReaderConfiguration::setReadCompacted)
-            .def("crypto_key_reader", &ReaderConfiguration_setCryptoKeyReader, 
return_self<>())
-            ;
+        .def("reader_listener", &ReaderConfiguration_setReaderListener, 
return_self<>())
+        .def("schema", &ReaderConfiguration::getSchema, 
return_value_policy<copy_const_reference>())
+        .def("schema", &ReaderConfiguration::setSchema, return_self<>())
+        .def("receiver_queue_size", &ReaderConfiguration::getReceiverQueueSize)
+        .def("receiver_queue_size", &ReaderConfiguration::setReceiverQueueSize)
+        .def("reader_name", &ReaderConfiguration::getReaderName, 
return_value_policy<copy_const_reference>())
+        .def("reader_name", &ReaderConfiguration::setReaderName)
+        .def("subscription_role_prefix", 
&ReaderConfiguration::getSubscriptionRolePrefix,
+             return_value_policy<copy_const_reference>())
+        .def("subscription_role_prefix", 
&ReaderConfiguration::setSubscriptionRolePrefix)
+        .def("read_compacted", &ReaderConfiguration::isReadCompacted)
+        .def("read_compacted", &ReaderConfiguration::setReadCompacted)
+        .def("crypto_key_reader", &ReaderConfiguration_setCryptoKeyReader, 
return_self<>());
 }
diff --git a/pulsar-client-cpp/python/src/consumer.cc 
b/pulsar-client-cpp/python/src/consumer.cc
index 815282d..28bedad 100644
--- a/pulsar-client-cpp/python/src/consumer.cc
+++ b/pulsar-client-cpp/python/src/consumer.cc
@@ -20,11 +20,10 @@
 
 void Consumer_unsubscribe(Consumer& consumer) {
     Result res;
-    Py_BEGIN_ALLOW_THREADS
-    res = consumer.unsubscribe();
+    Py_BEGIN_ALLOW_THREADS res = consumer.unsubscribe();
     Py_END_ALLOW_THREADS
 
-    CHECK_RESULT(res);
+        CHECK_RESULT(res);
 }
 
 Message Consumer_receive(Consumer& consumer) {
@@ -32,11 +31,10 @@ Message Consumer_receive(Consumer& consumer) {
     Result res;
 
     while (true) {
-        Py_BEGIN_ALLOW_THREADS
-        res = consumer.receive(msg);
+        Py_BEGIN_ALLOW_THREADS res = consumer.receive(msg);
         Py_END_ALLOW_THREADS
 
-        if (res != ResultTimeout) {
+            if (res != ResultTimeout) {
             // In case of timeout we keep calling receive() to simulate a
             // blocking call until a message is available, while breaking
             // every once in a while to check the Python signal status
@@ -56,17 +54,14 @@ Message Consumer_receive(Consumer& consumer) {
 Message Consumer_receive_timeout(Consumer& consumer, int timeoutMs) {
     Message msg;
     Result res;
-    Py_BEGIN_ALLOW_THREADS
-    res = consumer.receive(msg, timeoutMs);
+    Py_BEGIN_ALLOW_THREADS res = consumer.receive(msg, timeoutMs);
     Py_END_ALLOW_THREADS
 
-    CHECK_RESULT(res);
+        CHECK_RESULT(res);
     return msg;
 }
 
-void Consumer_acknowledge(Consumer& consumer, const Message& msg) {
-    consumer.acknowledgeAsync(msg, nullptr);
-}
+void Consumer_acknowledge(Consumer& consumer, const Message& msg) { 
consumer.acknowledgeAsync(msg, nullptr); }
 
 void Consumer_acknowledge_message_id(Consumer& consumer, const MessageId& 
msgId) {
     consumer.acknowledgeAsync(msgId, nullptr);
@@ -77,7 +72,7 @@ void Consumer_negative_acknowledge(Consumer& consumer, const 
Message& msg) {
 }
 
 void Consumer_negative_acknowledge_message_id(Consumer& consumer, const 
MessageId& msgId) {
-     consumer.negativeAcknowledge(msgId);
+    consumer.negativeAcknowledge(msgId);
 }
 
 void Consumer_acknowledge_cumulative(Consumer& consumer, const Message& msg) {
@@ -90,60 +85,52 @@ void Consumer_acknowledge_cumulative_message_id(Consumer& 
consumer, const Messag
 
 void Consumer_close(Consumer& consumer) {
     Result res;
-    Py_BEGIN_ALLOW_THREADS
-    res = consumer.close();
+    Py_BEGIN_ALLOW_THREADS res = consumer.close();
     Py_END_ALLOW_THREADS
 
-    CHECK_RESULT(res);
+        CHECK_RESULT(res);
 }
 
-void Consumer_pauseMessageListener(Consumer& consumer) {
-    CHECK_RESULT(consumer.pauseMessageListener());
-}
+void Consumer_pauseMessageListener(Consumer& consumer) { 
CHECK_RESULT(consumer.pauseMessageListener()); }
 
-void Consumer_resumeMessageListener(Consumer& consumer) {
-    CHECK_RESULT(consumer.resumeMessageListener());
-}
+void Consumer_resumeMessageListener(Consumer& consumer) { 
CHECK_RESULT(consumer.resumeMessageListener()); }
 
 void Consumer_seek(Consumer& consumer, const MessageId& msgId) {
     Result res;
-    Py_BEGIN_ALLOW_THREADS
-    res = consumer.seek(msgId);
+    Py_BEGIN_ALLOW_THREADS res = consumer.seek(msgId);
     Py_END_ALLOW_THREADS
 
-    CHECK_RESULT(res);
+        CHECK_RESULT(res);
 }
 
 void Consumer_seek_timestamp(Consumer& consumer, uint64_t timestamp) {
     Result res;
-    Py_BEGIN_ALLOW_THREADS
-    res = consumer.seek(timestamp);
+    Py_BEGIN_ALLOW_THREADS res = consumer.seek(timestamp);
     Py_END_ALLOW_THREADS
 
-    CHECK_RESULT(res);
+        CHECK_RESULT(res);
 }
 
 void export_consumer() {
     using namespace boost::python;
 
     class_<Consumer>("Consumer", no_init)
-            .def("topic", &Consumer::getTopic, "return the topic this consumer 
is subscribed to",
-                 return_value_policy<copy_const_reference>())
-            .def("subscription_name", &Consumer::getSubscriptionName, 
return_value_policy<copy_const_reference>())
-            .def("unsubscribe", &Consumer_unsubscribe)
-            .def("receive", &Consumer_receive)
-            .def("receive", &Consumer_receive_timeout)
-            .def("acknowledge", &Consumer_acknowledge)
-            .def("acknowledge", &Consumer_acknowledge_message_id)
-            .def("acknowledge_cumulative", &Consumer_acknowledge_cumulative)
-            .def("acknowledge_cumulative", 
&Consumer_acknowledge_cumulative_message_id)
-            .def("negative_acknowledge", &Consumer_negative_acknowledge)
-            .def("negative_acknowledge", 
&Consumer_negative_acknowledge_message_id)
-            .def("close", &Consumer_close)
-            .def("pause_message_listener", &Consumer_pauseMessageListener)
-            .def("resume_message_listener", &Consumer_resumeMessageListener)
-            .def("redeliver_unacknowledged_messages", 
&Consumer::redeliverUnacknowledgedMessages)
-            .def("seek", &Consumer_seek)
-            .def("seek", &Consumer_seek_timestamp)
-            ;
+        .def("topic", &Consumer::getTopic, "return the topic this consumer is 
subscribed to",
+             return_value_policy<copy_const_reference>())
+        .def("subscription_name", &Consumer::getSubscriptionName, 
return_value_policy<copy_const_reference>())
+        .def("unsubscribe", &Consumer_unsubscribe)
+        .def("receive", &Consumer_receive)
+        .def("receive", &Consumer_receive_timeout)
+        .def("acknowledge", &Consumer_acknowledge)
+        .def("acknowledge", &Consumer_acknowledge_message_id)
+        .def("acknowledge_cumulative", &Consumer_acknowledge_cumulative)
+        .def("acknowledge_cumulative", 
&Consumer_acknowledge_cumulative_message_id)
+        .def("negative_acknowledge", &Consumer_negative_acknowledge)
+        .def("negative_acknowledge", &Consumer_negative_acknowledge_message_id)
+        .def("close", &Consumer_close)
+        .def("pause_message_listener", &Consumer_pauseMessageListener)
+        .def("resume_message_listener", &Consumer_resumeMessageListener)
+        .def("redeliver_unacknowledged_messages", 
&Consumer::redeliverUnacknowledgedMessages)
+        .def("seek", &Consumer_seek)
+        .def("seek", &Consumer_seek_timestamp);
 }
diff --git a/pulsar-client-cpp/python/src/cryptoKeyReader.cc 
b/pulsar-client-cpp/python/src/cryptoKeyReader.cc
index ccefe6f..2c46b6f 100644
--- a/pulsar-client-cpp/python/src/cryptoKeyReader.cc
+++ b/pulsar-client-cpp/python/src/cryptoKeyReader.cc
@@ -21,7 +21,7 @@
 CryptoKeyReaderWrapper::CryptoKeyReaderWrapper() {}
 
 CryptoKeyReaderWrapper::CryptoKeyReaderWrapper(const std::string& 
publicKeyPath,
-                                             const std::string& 
privateKeyPath) {
+                                               const std::string& 
privateKeyPath) {
     this->cryptoKeyReader = DefaultCryptoKeyReader::create(publicKeyPath, 
privateKeyPath);
 }
 
diff --git a/pulsar-client-cpp/python/src/enums.cc 
b/pulsar-client-cpp/python/src/enums.cc
index c23b211..1b21af5 100644
--- a/pulsar-client-cpp/python/src/enums.cc
+++ b/pulsar-client-cpp/python/src/enums.cc
@@ -18,104 +18,96 @@
  */
 #include "utils.h"
 
-
 void export_enums() {
     using namespace boost::python;
 
     
enum_<ProducerConfiguration::PartitionsRoutingMode>("PartitionsRoutingMode")
-            .value("UseSinglePartition", 
ProducerConfiguration::UseSinglePartition)
-            .value("RoundRobinDistribution", 
ProducerConfiguration::RoundRobinDistribution)
-            .value("CustomPartition", ProducerConfiguration::CustomPartition)
-            ;
+        .value("UseSinglePartition", ProducerConfiguration::UseSinglePartition)
+        .value("RoundRobinDistribution", 
ProducerConfiguration::RoundRobinDistribution)
+        .value("CustomPartition", ProducerConfiguration::CustomPartition);
 
     enum_<CompressionType>("CompressionType")
-            .value("NONE", CompressionNone) // Don't use 'None' since it's a 
keyword in py3
-            .value("LZ4", CompressionLZ4)
-            .value("ZLib", CompressionZLib)
-            .value("ZSTD", CompressionZSTD)
-            .value("SNAPPY", CompressionSNAPPY)
-            ;
+        .value("NONE", CompressionNone)  // Don't use 'None' since it's a 
keyword in py3
+        .value("LZ4", CompressionLZ4)
+        .value("ZLib", CompressionZLib)
+        .value("ZSTD", CompressionZSTD)
+        .value("SNAPPY", CompressionSNAPPY);
 
     enum_<ConsumerType>("ConsumerType")
-            .value("Exclusive", ConsumerExclusive)
-            .value("Shared", ConsumerShared)
-            .value("Failover", ConsumerFailover)
-            .value("KeyShared", ConsumerKeyShared)
-            ;
+        .value("Exclusive", ConsumerExclusive)
+        .value("Shared", ConsumerShared)
+        .value("Failover", ConsumerFailover)
+        .value("KeyShared", ConsumerKeyShared);
 
-    enum_<Result >("Result", "Collection of return codes")
-            .value("Ok", ResultOk)
-            .value("UnknownError", ResultUnknownError)
-            .value("InvalidConfiguration", ResultInvalidConfiguration)
-            .value("Timeout", ResultTimeout)
-            .value("LookupError", ResultLookupError)
-            .value("ConnectError", ResultConnectError)
-            .value("ReadError", ResultReadError)
-            .value("AuthenticationError", ResultAuthenticationError)
-            .value("AuthorizationError", ResultAuthorizationError)
-            .value("ErrorGettingAuthenticationData", 
ResultErrorGettingAuthenticationData)
-            .value("BrokerMetadataError", ResultBrokerMetadataError)
-            .value("BrokerPersistenceError", ResultBrokerPersistenceError)
-            .value("ChecksumError", ResultChecksumError)
-            .value("ConsumerBusy", ResultConsumerBusy)
-            .value("NotConnected", ResultNotConnected)
-            .value("AlreadyClosed", ResultAlreadyClosed)
-            .value("InvalidMessage", ResultInvalidMessage)
-            .value("ConsumerNotInitialized", ResultConsumerNotInitialized)
-            .value("ProducerNotInitialized", ResultProducerNotInitialized)
-            .value("ProducerBusy", ResultProducerBusy)
-            .value("TooManyLookupRequestException", 
ResultTooManyLookupRequestException)
-            .value("InvalidTopicName", ResultInvalidTopicName)
-            .value("InvalidUrl", ResultInvalidUrl)
-            .value("ServiceUnitNotReady", ResultServiceUnitNotReady)
-            .value("OperationNotSupported", ResultOperationNotSupported)
-            .value("ProducerBlockedQuotaExceededError", 
ResultProducerBlockedQuotaExceededError)
-            .value("ProducerBlockedQuotaExceededException", 
ResultProducerBlockedQuotaExceededException)
-            .value("ProducerQueueIsFull", ResultProducerQueueIsFull)
-            .value("MessageTooBig", ResultMessageTooBig)
-            .value("TopicNotFound", ResultTopicNotFound)
-            .value("SubscriptionNotFound", ResultSubscriptionNotFound)
-            .value("ConsumerNotFound", ResultConsumerNotFound)
-            .value("UnsupportedVersionError", ResultUnsupportedVersionError)
-            .value("TopicTerminated", ResultTopicTerminated)
-            .value("CryptoError", ResultCryptoError)
-            .value("IncompatibleSchema", ResultIncompatibleSchema)
-            .value("ConsumerAssignError", ResultConsumerAssignError)
-            .value("CumulativeAcknowledgementNotAllowedError", 
ResultCumulativeAcknowledgementNotAllowedError)
-            .value("TransactionCoordinatorNotFoundError", 
ResultTransactionCoordinatorNotFoundError)
-            .value("InvalidTxnStatusError", ResultInvalidTxnStatusError)
-            .value("NotAllowedError", ResultNotAllowedError)
-            .value("TransactionConflict", ResultTransactionConflict)
-            .value("TransactionNotFound", ResultTransactionNotFound)
-            .value("ProducerFenced", ResultProducerFenced)
-            .value("MemoryBufferIsFull", ResultMemoryBufferIsFull)
-            ;
+    enum_<Result>("Result", "Collection of return codes")
+        .value("Ok", ResultOk)
+        .value("UnknownError", ResultUnknownError)
+        .value("InvalidConfiguration", ResultInvalidConfiguration)
+        .value("Timeout", ResultTimeout)
+        .value("LookupError", ResultLookupError)
+        .value("ConnectError", ResultConnectError)
+        .value("ReadError", ResultReadError)
+        .value("AuthenticationError", ResultAuthenticationError)
+        .value("AuthorizationError", ResultAuthorizationError)
+        .value("ErrorGettingAuthenticationData", 
ResultErrorGettingAuthenticationData)
+        .value("BrokerMetadataError", ResultBrokerMetadataError)
+        .value("BrokerPersistenceError", ResultBrokerPersistenceError)
+        .value("ChecksumError", ResultChecksumError)
+        .value("ConsumerBusy", ResultConsumerBusy)
+        .value("NotConnected", ResultNotConnected)
+        .value("AlreadyClosed", ResultAlreadyClosed)
+        .value("InvalidMessage", ResultInvalidMessage)
+        .value("ConsumerNotInitialized", ResultConsumerNotInitialized)
+        .value("ProducerNotInitialized", ResultProducerNotInitialized)
+        .value("ProducerBusy", ResultProducerBusy)
+        .value("TooManyLookupRequestException", 
ResultTooManyLookupRequestException)
+        .value("InvalidTopicName", ResultInvalidTopicName)
+        .value("InvalidUrl", ResultInvalidUrl)
+        .value("ServiceUnitNotReady", ResultServiceUnitNotReady)
+        .value("OperationNotSupported", ResultOperationNotSupported)
+        .value("ProducerBlockedQuotaExceededError", 
ResultProducerBlockedQuotaExceededError)
+        .value("ProducerBlockedQuotaExceededException", 
ResultProducerBlockedQuotaExceededException)
+        .value("ProducerQueueIsFull", ResultProducerQueueIsFull)
+        .value("MessageTooBig", ResultMessageTooBig)
+        .value("TopicNotFound", ResultTopicNotFound)
+        .value("SubscriptionNotFound", ResultSubscriptionNotFound)
+        .value("ConsumerNotFound", ResultConsumerNotFound)
+        .value("UnsupportedVersionError", ResultUnsupportedVersionError)
+        .value("TopicTerminated", ResultTopicTerminated)
+        .value("CryptoError", ResultCryptoError)
+        .value("IncompatibleSchema", ResultIncompatibleSchema)
+        .value("ConsumerAssignError", ResultConsumerAssignError)
+        .value("CumulativeAcknowledgementNotAllowedError", 
ResultCumulativeAcknowledgementNotAllowedError)
+        .value("TransactionCoordinatorNotFoundError", 
ResultTransactionCoordinatorNotFoundError)
+        .value("InvalidTxnStatusError", ResultInvalidTxnStatusError)
+        .value("NotAllowedError", ResultNotAllowedError)
+        .value("TransactionConflict", ResultTransactionConflict)
+        .value("TransactionNotFound", ResultTransactionNotFound)
+        .value("ProducerFenced", ResultProducerFenced)
+        .value("MemoryBufferIsFull", ResultMemoryBufferIsFull);
 
     enum_<SchemaType>("SchemaType", "Supported schema types")
-            .value("NONE", pulsar::NONE)
-            .value("STRING", pulsar::STRING)
-            .value("INT8", pulsar::INT8)
-            .value("INT16", pulsar::INT16)
-            .value("INT32", pulsar::INT32)
-            .value("INT64", pulsar::INT64)
-            .value("FLOAT", pulsar::FLOAT)
-            .value("DOUBLE", pulsar::DOUBLE)
-            .value("BYTES", pulsar::BYTES)
-            .value("JSON", pulsar::JSON)
-            .value("PROTOBUF", pulsar::PROTOBUF)
-            .value("AVRO", pulsar::AVRO)
-            .value("AUTO_CONSUME", pulsar::AUTO_CONSUME)
-            .value("AUTO_PUBLISH", pulsar::AUTO_PUBLISH)
-            .value("KEY_VALUE", pulsar::KEY_VALUE)
-            ;
+        .value("NONE", pulsar::NONE)
+        .value("STRING", pulsar::STRING)
+        .value("INT8", pulsar::INT8)
+        .value("INT16", pulsar::INT16)
+        .value("INT32", pulsar::INT32)
+        .value("INT64", pulsar::INT64)
+        .value("FLOAT", pulsar::FLOAT)
+        .value("DOUBLE", pulsar::DOUBLE)
+        .value("BYTES", pulsar::BYTES)
+        .value("JSON", pulsar::JSON)
+        .value("PROTOBUF", pulsar::PROTOBUF)
+        .value("AVRO", pulsar::AVRO)
+        .value("AUTO_CONSUME", pulsar::AUTO_CONSUME)
+        .value("AUTO_PUBLISH", pulsar::AUTO_PUBLISH)
+        .value("KEY_VALUE", pulsar::KEY_VALUE);
 
     enum_<InitialPosition>("InitialPosition", "Supported initial position")
-            .value("Latest", InitialPositionLatest)
-            .value("Earliest", InitialPositionEarliest)
-            ;
+        .value("Latest", InitialPositionLatest)
+        .value("Earliest", InitialPositionEarliest);
 
     enum_<ProducerConfiguration::BatchingType>("BatchingType", "Supported 
batching types")
-            .value("Default", ProducerConfiguration::DefaultBatching)
-            .value("KeyBased", ProducerConfiguration::KeyBasedBatching)
-            ;
+        .value("Default", ProducerConfiguration::DefaultBatching)
+        .value("KeyBased", ProducerConfiguration::KeyBasedBatching);
 }
diff --git a/pulsar-client-cpp/python/src/exceptions.cc 
b/pulsar-client-cpp/python/src/exceptions.cc
index c39b52d..25b3bd0 100644
--- a/pulsar-client-cpp/python/src/exceptions.cc
+++ b/pulsar-client-cpp/python/src/exceptions.cc
@@ -29,16 +29,13 @@ PyObject* createExceptionClass(const char* name, PyObject* 
baseTypeObj = PyExc_E
     std::string fullName = "_pulsar.";
     fullName += name;
 
-    PyObject* typeObj = PyErr_NewException(const_cast<char*>(fullName.c_str()),
-                                           baseTypeObj, nullptr);
+    PyObject* typeObj = 
PyErr_NewException(const_cast<char*>(fullName.c_str()), baseTypeObj, nullptr);
     if (!typeObj) throw_error_already_set();
     scope().attr(name) = handle<>(borrowed(typeObj));
     return typeObj;
 }
 
-PyObject* get_exception_class(Result result) {
-    return exceptions[result];
-}
+PyObject* get_exception_class(Result result) { return exceptions[result]; }
 
 void export_exceptions() {
     using namespace boost::python;
@@ -46,44 +43,58 @@ void export_exceptions() {
     basePulsarException = createExceptionClass("PulsarException");
 
     exceptions[ResultUnknownError] = createExceptionClass("UnknownError", 
basePulsarException);
-    exceptions[ResultInvalidConfiguration] = 
createExceptionClass("InvalidConfiguration", basePulsarException);
+    exceptions[ResultInvalidConfiguration] =
+        createExceptionClass("InvalidConfiguration", basePulsarException);
     exceptions[ResultTimeout] = createExceptionClass("Timeout", 
basePulsarException);
     exceptions[ResultLookupError] = createExceptionClass("LookupError", 
basePulsarException);
     exceptions[ResultConnectError] = createExceptionClass("ConnectError", 
basePulsarException);
     exceptions[ResultReadError] = createExceptionClass("ReadError", 
basePulsarException);
     exceptions[ResultAuthenticationError] = 
createExceptionClass("AuthenticationError", basePulsarException);
     exceptions[ResultAuthorizationError] = 
createExceptionClass("AuthorizationError", basePulsarException);
-    exceptions[ResultErrorGettingAuthenticationData] = 
createExceptionClass("ErrorGettingAuthenticationData", basePulsarException);
+    exceptions[ResultErrorGettingAuthenticationData] =
+        createExceptionClass("ErrorGettingAuthenticationData", 
basePulsarException);
     exceptions[ResultBrokerMetadataError] = 
createExceptionClass("BrokerMetadataError", basePulsarException);
-    exceptions[ResultBrokerPersistenceError] = 
createExceptionClass("BrokerPersistenceError", basePulsarException);
+    exceptions[ResultBrokerPersistenceError] =
+        createExceptionClass("BrokerPersistenceError", basePulsarException);
     exceptions[ResultChecksumError] = createExceptionClass("ChecksumError", 
basePulsarException);
     exceptions[ResultConsumerBusy] = createExceptionClass("ConsumerBusy", 
basePulsarException);
     exceptions[ResultNotConnected] = createExceptionClass("NotConnected", 
basePulsarException);
     exceptions[ResultAlreadyClosed] = createExceptionClass("AlreadyClosed", 
basePulsarException);
     exceptions[ResultInvalidMessage] = createExceptionClass("InvalidMessage", 
basePulsarException);
-    exceptions[ResultConsumerNotInitialized] = 
createExceptionClass("ConsumerNotInitialized", basePulsarException);
-    exceptions[ResultProducerNotInitialized] = 
createExceptionClass("ProducerNotInitialized", basePulsarException);
+    exceptions[ResultConsumerNotInitialized] =
+        createExceptionClass("ConsumerNotInitialized", basePulsarException);
+    exceptions[ResultProducerNotInitialized] =
+        createExceptionClass("ProducerNotInitialized", basePulsarException);
     exceptions[ResultProducerBusy] = createExceptionClass("ProducerBusy", 
basePulsarException);
-    exceptions[ResultTooManyLookupRequestException] = 
createExceptionClass("TooManyLookupRequestException", basePulsarException);
+    exceptions[ResultTooManyLookupRequestException] =
+        createExceptionClass("TooManyLookupRequestException", 
basePulsarException);
     exceptions[ResultInvalidTopicName] = 
createExceptionClass("InvalidTopicName", basePulsarException);
     exceptions[ResultInvalidUrl] = createExceptionClass("InvalidUrl", 
basePulsarException);
     exceptions[ResultServiceUnitNotReady] = 
createExceptionClass("ServiceUnitNotReady", basePulsarException);
-    exceptions[ResultOperationNotSupported] = 
createExceptionClass("OperationNotSupported", basePulsarException);
-    exceptions[ResultProducerBlockedQuotaExceededError] = 
createExceptionClass("ProducerBlockedQuotaExceededError", basePulsarException);
-    exceptions[ResultProducerBlockedQuotaExceededException] = 
createExceptionClass("ProducerBlockedQuotaExceededException", 
basePulsarException);
+    exceptions[ResultOperationNotSupported] =
+        createExceptionClass("OperationNotSupported", basePulsarException);
+    exceptions[ResultProducerBlockedQuotaExceededError] =
+        createExceptionClass("ProducerBlockedQuotaExceededError", 
basePulsarException);
+    exceptions[ResultProducerBlockedQuotaExceededException] =
+        createExceptionClass("ProducerBlockedQuotaExceededException", 
basePulsarException);
     exceptions[ResultProducerQueueIsFull] = 
createExceptionClass("ProducerQueueIsFull", basePulsarException);
     exceptions[ResultMessageTooBig] = createExceptionClass("MessageTooBig", 
basePulsarException);
     exceptions[ResultTopicNotFound] = createExceptionClass("TopicNotFound", 
basePulsarException);
-    exceptions[ResultSubscriptionNotFound] = 
createExceptionClass("SubscriptionNotFound", basePulsarException);
+    exceptions[ResultSubscriptionNotFound] =
+        createExceptionClass("SubscriptionNotFound", basePulsarException);
     exceptions[ResultConsumerNotFound] = 
createExceptionClass("ConsumerNotFound", basePulsarException);
-    exceptions[ResultUnsupportedVersionError] = 
createExceptionClass("UnsupportedVersionError", basePulsarException);
+    exceptions[ResultUnsupportedVersionError] =
+        createExceptionClass("UnsupportedVersionError", basePulsarException);
     exceptions[ResultTopicTerminated] = 
createExceptionClass("TopicTerminated", basePulsarException);
     exceptions[ResultCryptoError] = createExceptionClass("CryptoError", 
basePulsarException);
     exceptions[ResultIncompatibleSchema] = 
createExceptionClass("IncompatibleSchema", basePulsarException);
     exceptions[ResultConsumerAssignError] = 
createExceptionClass("ConsumerAssignError", basePulsarException);
-    exceptions[ResultCumulativeAcknowledgementNotAllowedError] = 
createExceptionClass("CumulativeAcknowledgementNotAllowedError", 
basePulsarException);
-    exceptions[ResultTransactionCoordinatorNotFoundError] = 
createExceptionClass("TransactionCoordinatorNotFoundError", 
basePulsarException);
-    exceptions[ResultInvalidTxnStatusError] = 
createExceptionClass("InvalidTxnStatusError", basePulsarException);
+    exceptions[ResultCumulativeAcknowledgementNotAllowedError] =
+        createExceptionClass("CumulativeAcknowledgementNotAllowedError", 
basePulsarException);
+    exceptions[ResultTransactionCoordinatorNotFoundError] =
+        createExceptionClass("TransactionCoordinatorNotFoundError", 
basePulsarException);
+    exceptions[ResultInvalidTxnStatusError] =
+        createExceptionClass("InvalidTxnStatusError", basePulsarException);
     exceptions[ResultNotAllowedError] = 
createExceptionClass("NotAllowedError", basePulsarException);
     exceptions[ResultTransactionConflict] = 
createExceptionClass("TransactionConflict", basePulsarException);
     exceptions[ResultTransactionNotFound] = 
createExceptionClass("TransactionNotFound", basePulsarException);
diff --git a/pulsar-client-cpp/python/src/message.cc 
b/pulsar-client-cpp/python/src/message.cc
index 8532966..b93380b 100644
--- a/pulsar-client-cpp/python/src/message.cc
+++ b/pulsar-client-cpp/python/src/message.cc
@@ -28,34 +28,23 @@ std::string MessageId_str(const MessageId& msgId) {
     return ss.str();
 }
 
-bool MessageId_eq(const MessageId& a, const MessageId& b) {
-    return a == b;
-}
+bool MessageId_eq(const MessageId& a, const MessageId& b) { return a == b; }
 
-bool MessageId_ne(const MessageId& a, const MessageId& b) {
-    return a != b;
-}
+bool MessageId_ne(const MessageId& a, const MessageId& b) { return a != b; }
 
-bool MessageId_lt(const MessageId& a, const MessageId& b) {
-    return a < b;
-}
+bool MessageId_lt(const MessageId& a, const MessageId& b) { return a < b; }
 
-bool MessageId_le(const MessageId& a, const MessageId& b) {
-    return a <= b;
-}
+bool MessageId_le(const MessageId& a, const MessageId& b) { return a <= b; }
 
-bool MessageId_gt(const MessageId& a, const MessageId& b) {
-    return a > b;
-}
+bool MessageId_gt(const MessageId& a, const MessageId& b) { return a > b; }
 
-bool MessageId_ge(const MessageId& a, const MessageId& b) {
-    return a >= b;
-}
+bool MessageId_ge(const MessageId& a, const MessageId& b) { return a >= b; }
 
 boost::python::object MessageId_serialize(const MessageId& msgId) {
     std::string serialized;
     msgId.serialize(serialized);
-    return 
boost::python::object(boost::python::handle<>(PyBytes_FromStringAndSize(serialized.c_str(),
 serialized.length())));
+    return boost::python::object(
+        boost::python::handle<>(PyBytes_FromStringAndSize(serialized.c_str(), 
serialized.length())));
 }
 
 std::string Message_str(const Message& msg) {
@@ -65,7 +54,8 @@ std::string Message_str(const Message& msg) {
 }
 
 boost::python::object Message_data(const Message& msg) {
-    return 
boost::python::object(boost::python::handle<>(PyBytes_FromStringAndSize((const 
char*)msg.getData(), msg.getLength())));
+    return boost::python::object(
+        boost::python::handle<>(PyBytes_FromStringAndSize((const 
char*)msg.getData(), msg.getLength())));
 }
 
 boost::python::object Message_properties(const Message& msg) {
@@ -88,9 +78,7 @@ std::string schema_version_str(const Message& msg) {
     return ss.str();
 }
 
-const MessageId& Message_getMessageId(const Message& msg) {
-    return msg.getMessageId();
-}
+const MessageId& Message_getMessageId(const Message& msg) { return 
msg.getMessageId(); }
 
 void deliverAfter(MessageBuilder* const builder, PyObject* obj_delta) {
     PyDateTime_Delta const* pydelta = 
reinterpret_cast<PyDateTime_Delta*>(obj_delta);
@@ -102,12 +90,9 @@ void deliverAfter(MessageBuilder* const builder, PyObject* 
obj_delta) {
     }
 
     // Create chrono duration object
-    std::chrono::milliseconds
-        duration = std::chrono::duration_cast<std::chrono::milliseconds>(
-                std::chrono::hours(24)*days
-                + std::chrono::seconds(pydelta->seconds)
-                + std::chrono::microseconds(pydelta->microseconds)
-                );
+    std::chrono::milliseconds duration = 
std::chrono::duration_cast<std::chrono::milliseconds>(
+        std::chrono::hours(24) * days + std::chrono::seconds(pydelta->seconds) 
+
+        std::chrono::microseconds(pydelta->microseconds));
 
     if (is_negative) {
         duration = duration * -1;
@@ -121,70 +106,66 @@ void export_message() {
 
     PyDateTime_IMPORT;
 
-    MessageBuilder& (MessageBuilder::*MessageBuilderSetContentString)(const 
std::string&) = &MessageBuilder::setContent;
+    MessageBuilder& (MessageBuilder::*MessageBuilderSetContentString)(const 
std::string&) =
+        &MessageBuilder::setContent;
 
     class_<MessageBuilder, boost::noncopyable>("MessageBuilder")
-            .def("content", MessageBuilderSetContentString, return_self<>())
-            .def("property", &MessageBuilder::setProperty, return_self<>())
-            .def("properties", &MessageBuilder::setProperties, return_self<>())
-            .def("sequence_id", &MessageBuilder::setSequenceId, 
return_self<>())
-            .def("deliver_after", &deliverAfter, return_self<>())
-            .def("deliver_at", &MessageBuilder::setDeliverAt, return_self<>())
-            .def("partition_key", &MessageBuilder::setPartitionKey, 
return_self<>())
-            .def("event_timestamp", &MessageBuilder::setEventTimestamp, 
return_self<>())
-            .def("replication_clusters", 
&MessageBuilder::setReplicationClusters, return_self<>())
-            .def("disable_replication", &MessageBuilder::disableReplication, 
return_self<>())
-            .def("build", &MessageBuilder::build)
-            ;
-
-    class_<Message::StringMap>("MessageStringMap")
-            .def(map_indexing_suite<Message::StringMap>())
-            ;
+        .def("content", MessageBuilderSetContentString, return_self<>())
+        .def("property", &MessageBuilder::setProperty, return_self<>())
+        .def("properties", &MessageBuilder::setProperties, return_self<>())
+        .def("sequence_id", &MessageBuilder::setSequenceId, return_self<>())
+        .def("deliver_after", &deliverAfter, return_self<>())
+        .def("deliver_at", &MessageBuilder::setDeliverAt, return_self<>())
+        .def("partition_key", &MessageBuilder::setPartitionKey, 
return_self<>())
+        .def("event_timestamp", &MessageBuilder::setEventTimestamp, 
return_self<>())
+        .def("replication_clusters", &MessageBuilder::setReplicationClusters, 
return_self<>())
+        .def("disable_replication", &MessageBuilder::disableReplication, 
return_self<>())
+        .def("build", &MessageBuilder::build);
+
+    
class_<Message::StringMap>("MessageStringMap").def(map_indexing_suite<Message::StringMap>());
 
     static const MessageId& _MessageId_earliest = MessageId::earliest();
     static const MessageId& _MessageId_latest = MessageId::latest();
 
     class_<MessageId>("MessageId")
-            .def(init<int32_t, int64_t, int64_t, int32_t>())
-            .def("__str__", &MessageId_str)
-            .def("__eq__", &MessageId_eq)
-            .def("__ne__", &MessageId_ne)
-            .def("__le__", &MessageId_le)
-            .def("__lt__", &MessageId_lt)
-            .def("__ge__", &MessageId_ge)
-            .def("__gt__", &MessageId_gt)
-            .def("ledger_id", &MessageId::ledgerId)
-            .def("entry_id", &MessageId::entryId)
-            .def("batch_index", &MessageId::batchIndex)
-            .def("partition", &MessageId::partition)
-            .add_static_property("earliest", make_getter(&_MessageId_earliest))
-            .add_static_property("latest", make_getter(&_MessageId_latest))
-            .def("serialize", &MessageId_serialize)
-            .def("deserialize", 
&MessageId::deserialize).staticmethod("deserialize")
-            ;
+        .def(init<int32_t, int64_t, int64_t, int32_t>())
+        .def("__str__", &MessageId_str)
+        .def("__eq__", &MessageId_eq)
+        .def("__ne__", &MessageId_ne)
+        .def("__le__", &MessageId_le)
+        .def("__lt__", &MessageId_lt)
+        .def("__ge__", &MessageId_ge)
+        .def("__gt__", &MessageId_gt)
+        .def("ledger_id", &MessageId::ledgerId)
+        .def("entry_id", &MessageId::entryId)
+        .def("batch_index", &MessageId::batchIndex)
+        .def("partition", &MessageId::partition)
+        .add_static_property("earliest", make_getter(&_MessageId_earliest))
+        .add_static_property("latest", make_getter(&_MessageId_latest))
+        .def("serialize", &MessageId_serialize)
+        .def("deserialize", &MessageId::deserialize)
+        .staticmethod("deserialize");
 
     class_<Message>("Message")
-            .def("properties", &Message_properties)
-            .def("data", &Message_data)
-            .def("length", &Message::getLength)
-            .def("partition_key", &Message::getPartitionKey, 
return_value_policy<copy_const_reference>())
-            .def("publish_timestamp", &Message::getPublishTimestamp)
-            .def("event_timestamp", &Message::getEventTimestamp)
-            .def("message_id", &Message_getMessageId, 
return_value_policy<copy_const_reference>())
-            .def("__str__", &Message_str)
-            .def("topic_name", &Topic_name_str)
-            .def("redelivery_count", &Message::getRedeliveryCount)
-            .def("schema_version", &schema_version_str)
-            ;
-
-    MessageBatch& (MessageBatch::*MessageBatchParseFromString)(const 
std::string& payload, uint32_t batchSize) = &MessageBatch::parseFrom;
+        .def("properties", &Message_properties)
+        .def("data", &Message_data)
+        .def("length", &Message::getLength)
+        .def("partition_key", &Message::getPartitionKey, 
return_value_policy<copy_const_reference>())
+        .def("publish_timestamp", &Message::getPublishTimestamp)
+        .def("event_timestamp", &Message::getEventTimestamp)
+        .def("message_id", &Message_getMessageId, 
return_value_policy<copy_const_reference>())
+        .def("__str__", &Message_str)
+        .def("topic_name", &Topic_name_str)
+        .def("redelivery_count", &Message::getRedeliveryCount)
+        .def("schema_version", &schema_version_str);
+
+    MessageBatch& (MessageBatch::*MessageBatchParseFromString)(const 
std::string& payload,
+                                                               uint32_t 
batchSize) = &MessageBatch::parseFrom;
 
     class_<MessageBatch>("MessageBatch")
-            .def("with_message_id", &MessageBatch::withMessageId, 
return_self<>())
-            .def("parse_from", MessageBatchParseFromString, return_self<>())
-            .def("messages", &MessageBatch::messages, 
return_value_policy<copy_const_reference>())
-            ;
+        .def("with_message_id", &MessageBatch::withMessageId, return_self<>())
+        .def("parse_from", MessageBatchParseFromString, return_self<>())
+        .def("messages", &MessageBatch::messages, 
return_value_policy<copy_const_reference>());
 
-    class_<std::vector<Message> >("Messages")
-        .def(vector_indexing_suite<std::vector<Message> >() );
+    class_<std::vector<Message> 
>("Messages").def(vector_indexing_suite<std::vector<Message> >());
 }
diff --git a/pulsar-client-cpp/python/src/producer.cc 
b/pulsar-client-cpp/python/src/producer.cc
index 343650f..345639e 100644
--- a/pulsar-client-cpp/python/src/producer.cc
+++ b/pulsar-client-cpp/python/src/producer.cc
@@ -25,11 +25,10 @@ extern boost::python::object MessageId_serialize(const 
MessageId& msgId);
 boost::python::object Producer_send(Producer& producer, const Message& 
message) {
     Result res;
     MessageId messageId;
-    Py_BEGIN_ALLOW_THREADS
-    res = producer.send(message, messageId);
+    Py_BEGIN_ALLOW_THREADS res = producer.send(message, messageId);
     Py_END_ALLOW_THREADS
 
-    CHECK_RESULT(res);
+        CHECK_RESULT(res);
     return MessageId_serialize(messageId);
 }
 
@@ -54,57 +53,55 @@ void Producer_sendAsync(Producer& producer, const Message& 
message, py::object c
     PyObject* pyCallback = callback.ptr();
     Py_XINCREF(pyCallback);
 
-    Py_BEGIN_ALLOW_THREADS
-    producer.sendAsync(message, std::bind(Producer_sendAsyncCallback, 
pyCallback,
-            std::placeholders::_1, std::placeholders::_2));
+    Py_BEGIN_ALLOW_THREADS producer.sendAsync(
+        message,
+        std::bind(Producer_sendAsyncCallback, pyCallback, 
std::placeholders::_1, std::placeholders::_2));
     Py_END_ALLOW_THREADS
 }
 
 void Producer_flush(Producer& producer) {
     Result res;
-    Py_BEGIN_ALLOW_THREADS
-    res = producer.flush();
+    Py_BEGIN_ALLOW_THREADS res = producer.flush();
     Py_END_ALLOW_THREADS
 
-    CHECK_RESULT(res);
+        CHECK_RESULT(res);
 }
 
 void Producer_close(Producer& producer) {
     Result res;
-    Py_BEGIN_ALLOW_THREADS
-    res = producer.close();
+    Py_BEGIN_ALLOW_THREADS res = producer.close();
     Py_END_ALLOW_THREADS
 
-    CHECK_RESULT(res);
+        CHECK_RESULT(res);
 }
 
 void export_producer() {
     using namespace boost::python;
 
     class_<Producer>("Producer", no_init)
-            .def("topic", &Producer::getTopic, "return the topic to which 
producer is publishing to",
-                 return_value_policy<copy_const_reference>())
-            .def("producer_name", &Producer::getProducerName,
-                 "return the producer name which could have been assigned by 
the system or specified by the client",
-                 return_value_policy<copy_const_reference>())
-            .def("last_sequence_id", &Producer::getLastSequenceId)
-            .def("send", &Producer_send,
-                 "Publish a message on the topic associated with this 
Producer.\n"
-                         "\n"
-                         "This method will block until the message will be 
accepted and persisted\n"
-                         "by the broker. In case of errors, the client library 
will try to\n"
-                         "automatically recover and use a different broker.\n"
-                         "\n"
-                         "If it wasn't possible to successfully publish the 
message within the sendTimeout,\n"
-                         "an error will be returned.\n"
-                         "\n"
-                         "This method is equivalent to asyncSend() and wait 
until the callback is triggered.\n"
-                         "\n"
-                         "@param msg message to publish\n")
-            .def("send_async", &Producer_sendAsync)
-            .def("flush", &Producer_flush,
-                 "Flush all the messages buffered in the client and wait until 
all messages have been\n"
-                         "successfully persisted\n")
-            .def("close", &Producer_close)
-            ;
+        .def("topic", &Producer::getTopic, "return the topic to which producer 
is publishing to",
+             return_value_policy<copy_const_reference>())
+        .def("producer_name", &Producer::getProducerName,
+             "return the producer name which could have been assigned by the 
system or specified by the "
+             "client",
+             return_value_policy<copy_const_reference>())
+        .def("last_sequence_id", &Producer::getLastSequenceId)
+        .def("send", &Producer_send,
+             "Publish a message on the topic associated with this Producer.\n"
+             "\n"
+             "This method will block until the message will be accepted and 
persisted\n"
+             "by the broker. In case of errors, the client library will try 
to\n"
+             "automatically recover and use a different broker.\n"
+             "\n"
+             "If it wasn't possible to successfully publish the message within 
the sendTimeout,\n"
+             "an error will be returned.\n"
+             "\n"
+             "This method is equivalent to asyncSend() and wait until the 
callback is triggered.\n"
+             "\n"
+             "@param msg message to publish\n")
+        .def("send_async", &Producer_sendAsync)
+        .def("flush", &Producer_flush,
+             "Flush all the messages buffered in the client and wait until all 
messages have been\n"
+             "successfully persisted\n")
+        .def("close", &Producer_close);
 }
diff --git a/pulsar-client-cpp/python/src/pulsar.cc 
b/pulsar-client-cpp/python/src/pulsar.cc
index a46ce53..e591b73 100644
--- a/pulsar-client-cpp/python/src/pulsar.cc
+++ b/pulsar-client-cpp/python/src/pulsar.cc
@@ -32,7 +32,6 @@ void export_exceptions();
 
 PyObject* get_exception_class(Result result);
 
-
 static void translateException(const PulsarException& ex) {
     std::string err = "Pulsar error: ";
     err += strResult(ex._result);
@@ -40,8 +39,7 @@ static void translateException(const PulsarException& ex) {
     PyErr_SetString(get_exception_class(ex._result), err.c_str());
 }
 
-BOOST_PYTHON_MODULE(_pulsar)
-{
+BOOST_PYTHON_MODULE(_pulsar) {
     py::register_exception_translator<PulsarException>(translateException);
 
     // Initialize thread support so that we can grab the GIL mutex
diff --git a/pulsar-client-cpp/python/src/reader.cc 
b/pulsar-client-cpp/python/src/reader.cc
index fec65da..668fb94 100644
--- a/pulsar-client-cpp/python/src/reader.cc
+++ b/pulsar-client-cpp/python/src/reader.cc
@@ -24,12 +24,12 @@ Message Reader_readNext(Reader& reader) {
 
     while (true) {
         Py_BEGIN_ALLOW_THREADS
-        // Use 100ms timeout to periodically check whether the
-        // interpreter was interrupted
-        res = reader.readNext(msg, 100);
+            // Use 100ms timeout to periodically check whether the
+            // interpreter was interrupted
+            res = reader.readNext(msg, 100);
         Py_END_ALLOW_THREADS
 
-        if (res != ResultTimeout) {
+            if (res != ResultTimeout) {
             // In case of timeout we keep calling receive() to simulate a
             // blocking call until a message is available, while breaking
             // every once in a while to check the Python signal status
@@ -49,62 +49,56 @@ Message Reader_readNext(Reader& reader) {
 Message Reader_readNextTimeout(Reader& reader, int timeoutMs) {
     Message msg;
     Result res;
-    Py_BEGIN_ALLOW_THREADS
-    res = reader.readNext(msg, timeoutMs);
+    Py_BEGIN_ALLOW_THREADS res = reader.readNext(msg, timeoutMs);
     Py_END_ALLOW_THREADS
 
-    CHECK_RESULT(res);
+        CHECK_RESULT(res);
     return msg;
 }
 
 bool Reader_hasMessageAvailable(Reader& reader) {
     bool available = false;
     Result res;
-    Py_BEGIN_ALLOW_THREADS
-        res = reader.hasMessageAvailable(available);
+    Py_BEGIN_ALLOW_THREADS res = reader.hasMessageAvailable(available);
     Py_END_ALLOW_THREADS
 
-    CHECK_RESULT(res);
+        CHECK_RESULT(res);
     return available;
 }
 
 void Reader_close(Reader& reader) {
     Result res;
-    Py_BEGIN_ALLOW_THREADS
-    res = reader.close();
+    Py_BEGIN_ALLOW_THREADS res = reader.close();
     Py_END_ALLOW_THREADS
 
-    CHECK_RESULT(res);
+        CHECK_RESULT(res);
 }
 
 void Reader_seek(Reader& reader, const MessageId& msgId) {
     Result res;
-    Py_BEGIN_ALLOW_THREADS
-    res = reader.seek(msgId);
+    Py_BEGIN_ALLOW_THREADS res = reader.seek(msgId);
     Py_END_ALLOW_THREADS
 
-    CHECK_RESULT(res);
+        CHECK_RESULT(res);
 }
 
 void Reader_seek_timestamp(Reader& reader, uint64_t timestamp) {
     Result res;
-    Py_BEGIN_ALLOW_THREADS
-    res = reader.seek(timestamp);
+    Py_BEGIN_ALLOW_THREADS res = reader.seek(timestamp);
     Py_END_ALLOW_THREADS
 
-    CHECK_RESULT(res);
+        CHECK_RESULT(res);
 }
 
 void export_reader() {
     using namespace boost::python;
 
     class_<Reader>("Reader", no_init)
-            .def("topic", &Reader::getTopic, 
return_value_policy<copy_const_reference>())
-            .def("read_next", &Reader_readNext)
-            .def("read_next", &Reader_readNextTimeout)
-            .def("has_message_available", &Reader_hasMessageAvailable)
-            .def("close", &Reader_close)
-            .def("seek", &Reader_seek)
-            .def("seek", &Reader_seek_timestamp)
-            ;
+        .def("topic", &Reader::getTopic, 
return_value_policy<copy_const_reference>())
+        .def("read_next", &Reader_readNext)
+        .def("read_next", &Reader_readNextTimeout)
+        .def("has_message_available", &Reader_hasMessageAvailable)
+        .def("close", &Reader_close)
+        .def("seek", &Reader_seek)
+        .def("seek", &Reader_seek_timestamp);
 }
diff --git a/pulsar-client-cpp/python/src/schema.cc 
b/pulsar-client-cpp/python/src/schema.cc
index 397ec65..cdfcda6 100644
--- a/pulsar-client-cpp/python/src/schema.cc
+++ b/pulsar-client-cpp/python/src/schema.cc
@@ -21,10 +21,8 @@
 void export_schema() {
     using namespace boost::python;
 
-    class_<SchemaInfo>("SchemaInfo",
-            init<SchemaType, const std::string& , const std::string&>())
-            .def("schema_type", &SchemaInfo::getSchemaType)
-            .def("name", &SchemaInfo::getName, 
return_value_policy<copy_const_reference>())
-            .def("schema", &SchemaInfo::getSchema, 
return_value_policy<copy_const_reference>())
-            ;
+    class_<SchemaInfo>("SchemaInfo", init<SchemaType, const std::string&, 
const std::string&>())
+        .def("schema_type", &SchemaInfo::getSchemaType)
+        .def("name", &SchemaInfo::getName, 
return_value_policy<copy_const_reference>())
+        .def("schema", &SchemaInfo::getSchema, 
return_value_policy<copy_const_reference>());
 }
diff --git a/pulsar-client-cpp/python/src/utils.h 
b/pulsar-client-cpp/python/src/utils.h
index 457d1f8..5be4473 100644
--- a/pulsar-client-cpp/python/src/utils.h
+++ b/pulsar-client-cpp/python/src/utils.h
@@ -27,8 +27,7 @@ namespace py = boost::python;
 
 struct PulsarException {
     Result _result;
-    PulsarException(Result res) :
-            _result(res) {}
+    PulsarException(Result res) : _result(res) {}
 };
 
 inline void CHECK_RESULT(Result res) {

Reply via email to