This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit f1aeb06c04daf87d2e5b4dcaa4fa1fa6f5d67b7f Author: Yunze Xu <[email protected]> AuthorDate: Tue Dec 21 23:51:16 2021 +0800 Apply clang-format check for python wrapper (#13418) (cherry picked from commit 636d5b4c2133bcdf8c1988bbe82eb1064565d5dc) --- pulsar-client-cpp/CMakeLists.txt | 6 +- pulsar-client-cpp/python/src/authentication.cc | 44 ++-- pulsar-client-cpp/python/src/client.cc | 63 +++--- pulsar-client-cpp/python/src/config.cc | 256 ++++++++++++------------ pulsar-client-cpp/python/src/consumer.cc | 81 ++++---- pulsar-client-cpp/python/src/cryptoKeyReader.cc | 2 +- pulsar-client-cpp/python/src/enums.cc | 162 +++++++-------- pulsar-client-cpp/python/src/exceptions.cc | 49 +++-- pulsar-client-cpp/python/src/message.cc | 147 ++++++-------- pulsar-client-cpp/python/src/producer.cc | 71 ++++--- pulsar-client-cpp/python/src/pulsar.cc | 4 +- pulsar-client-cpp/python/src/reader.cc | 48 ++--- pulsar-client-cpp/python/src/schema.cc | 10 +- pulsar-client-cpp/python/src/utils.h | 3 +- 14 files changed, 441 insertions(+), 505 deletions(-) diff --git a/pulsar-client-cpp/CMakeLists.txt b/pulsar-client-cpp/CMakeLists.txt index 8ba9569..3fadb05 100644 --- a/pulsar-client-cpp/CMakeLists.txt +++ b/pulsar-client-cpp/CMakeLists.txt @@ -446,7 +446,8 @@ add_custom_target(format python ${BUILD_SUPPORT_DIR}/run_clang_format.py ${CMAKE_SOURCE_DIR}/perf ${CMAKE_SOURCE_DIR}/examples ${CMAKE_SOURCE_DIR}/tests - ${CMAKE_SOURCE_DIR}/include) + ${CMAKE_SOURCE_DIR}/include + ${CMAKE_SOURCE_DIR}/python/src) # `make check-format` option (for CI test) add_custom_target(check-format python ${BUILD_SUPPORT_DIR}/run_clang_format.py @@ -457,4 +458,5 @@ add_custom_target(check-format python ${BUILD_SUPPORT_DIR}/run_clang_format.py ${CMAKE_SOURCE_DIR}/perf ${CMAKE_SOURCE_DIR}/examples ${CMAKE_SOURCE_DIR}/tests - ${CMAKE_SOURCE_DIR}/include) + ${CMAKE_SOURCE_DIR}/include + ${CMAKE_SOURCE_DIR}/python/src) diff --git a/pulsar-client-cpp/python/src/authentication.cc b/pulsar-client-cpp/python/src/authentication.cc index e236c7e..920a717 100644 --- a/pulsar-client-cpp/python/src/authentication.cc +++ b/pulsar-client-cpp/python/src/authentication.cc @@ -26,8 +26,8 @@ AuthenticationWrapper::AuthenticationWrapper(const std::string& dynamicLibPath, } struct AuthenticationTlsWrapper : public AuthenticationWrapper { - AuthenticationTlsWrapper(const std::string& certificatePath, const std::string& privateKeyPath) : - AuthenticationWrapper() { + AuthenticationTlsWrapper(const std::string& certificatePath, const std::string& privateKeyPath) + : AuthenticationWrapper() { this->auth = AuthTls::create(certificatePath, privateKeyPath); } }; @@ -35,13 +35,10 @@ struct AuthenticationTlsWrapper : public AuthenticationWrapper { struct TokenSupplierWrapper { PyObject* _pySupplier; - TokenSupplierWrapper(py::object pySupplier) : - _pySupplier(pySupplier.ptr()) { - Py_XINCREF(_pySupplier); - } + TokenSupplierWrapper(py::object pySupplier) : _pySupplier(pySupplier.ptr()) { Py_XINCREF(_pySupplier); } TokenSupplierWrapper(const TokenSupplierWrapper& other) { - _pySupplier= other._pySupplier; + _pySupplier = other._pySupplier; Py_XINCREF(_pySupplier); } @@ -51,9 +48,7 @@ struct TokenSupplierWrapper { return *this; } - virtual ~TokenSupplierWrapper() { - Py_XDECREF(_pySupplier); - } + virtual ~TokenSupplierWrapper() { Py_XDECREF(_pySupplier); } std::string operator()() { PyGILState_STATE state = PyGILState_Ensure(); @@ -61,7 +56,7 @@ struct TokenSupplierWrapper { std::string token; try { token = py::call<std::string>(_pySupplier); - } catch(const py::error_already_set& e) { + } catch (const py::error_already_set& e) { PyErr_Print(); } @@ -70,10 +65,8 @@ struct TokenSupplierWrapper { } }; - struct AuthenticationTokenWrapper : public AuthenticationWrapper { - AuthenticationTokenWrapper(py::object token) : - AuthenticationWrapper() { + AuthenticationTokenWrapper(py::object token) : AuthenticationWrapper() { if (py::extract<std::string>(token).check()) { // It's a string std::string tokenStr = py::extract<std::string>(token); @@ -86,15 +79,13 @@ struct AuthenticationTokenWrapper : public AuthenticationWrapper { }; struct AuthenticationAthenzWrapper : public AuthenticationWrapper { - AuthenticationAthenzWrapper(const std::string& authParamsString) : - AuthenticationWrapper() { + AuthenticationAthenzWrapper(const std::string& authParamsString) : AuthenticationWrapper() { this->auth = AuthAthenz::create(authParamsString); } }; struct AuthenticationOauth2Wrapper : public AuthenticationWrapper { - AuthenticationOauth2Wrapper(const std::string& authParamsString) : - AuthenticationWrapper() { + AuthenticationOauth2Wrapper(const std::string& authParamsString) : AuthenticationWrapper() { this->auth = AuthOauth2::create(authParamsString); } }; @@ -102,22 +93,17 @@ struct AuthenticationOauth2Wrapper : public AuthenticationWrapper { void export_authentication() { using namespace boost::python; - class_<AuthenticationWrapper>("Authentication", init<const std::string&, const std::string&>()) - ; + class_<AuthenticationWrapper>("Authentication", init<const std::string&, const std::string&>()); - class_<AuthenticationTlsWrapper, bases<AuthenticationWrapper> >("AuthenticationTLS", - init<const std::string&, const std::string&>()) - ; + class_<AuthenticationTlsWrapper, bases<AuthenticationWrapper> >( + "AuthenticationTLS", init<const std::string&, const std::string&>()); class_<AuthenticationTokenWrapper, bases<AuthenticationWrapper> >("AuthenticationToken", - init<py::object>()) - ; + init<py::object>()); class_<AuthenticationAthenzWrapper, bases<AuthenticationWrapper> >("AuthenticationAthenz", - init<const std::string&>()) - ; + init<const std::string&>()); class_<AuthenticationOauth2Wrapper, bases<AuthenticationWrapper> >("AuthenticationOauth2", - init<const std::string&>()) - ; + init<const std::string&>()); } diff --git a/pulsar-client-cpp/python/src/client.cc b/pulsar-client-cpp/python/src/client.cc index 3dcbf7f..445a6dc 100644 --- a/pulsar-client-cpp/python/src/client.cc +++ b/pulsar-client-cpp/python/src/client.cc @@ -22,11 +22,10 @@ Producer Client_createProducer(Client& client, const std::string& topic, const P Producer producer; Result res; - Py_BEGIN_ALLOW_THREADS - res = client.createProducer(topic, conf, producer); + Py_BEGIN_ALLOW_THREADS res = client.createProducer(topic, conf, producer); Py_END_ALLOW_THREADS - CHECK_RESULT(res); + CHECK_RESULT(res); return producer; } @@ -35,11 +34,10 @@ Consumer Client_subscribe(Client& client, const std::string& topic, const std::s Consumer consumer; Result res; - Py_BEGIN_ALLOW_THREADS - res = client.subscribe(topic, subscriptionName, conf, consumer); + Py_BEGIN_ALLOW_THREADS res = client.subscribe(topic, subscriptionName, conf, consumer); Py_END_ALLOW_THREADS - CHECK_RESULT(res); + CHECK_RESULT(res); return consumer; } @@ -50,43 +48,39 @@ Consumer Client_subscribe_topics(Client& client, boost::python::list& topics, std::vector<std::string> topics_vector; - for (int i = 0; i < len(topics); i ++) { + for (int i = 0; i < len(topics); i++) { std::string content = boost::python::extract<std::string>(topics[i]); topics_vector.push_back(content); } - Py_BEGIN_ALLOW_THREADS - res = client.subscribe(topics_vector, subscriptionName, conf, consumer); + Py_BEGIN_ALLOW_THREADS res = client.subscribe(topics_vector, subscriptionName, conf, consumer); Py_END_ALLOW_THREADS - CHECK_RESULT(res); + CHECK_RESULT(res); return consumer; } -Consumer Client_subscribe_pattern(Client& client, const std::string& topic_pattern, const std::string& subscriptionName, - const ConsumerConfiguration& conf) { +Consumer Client_subscribe_pattern(Client& client, const std::string& topic_pattern, + const std::string& subscriptionName, const ConsumerConfiguration& conf) { Consumer consumer; Result res; - Py_BEGIN_ALLOW_THREADS - res = client.subscribeWithRegex(topic_pattern, subscriptionName, conf, consumer); + Py_BEGIN_ALLOW_THREADS res = client.subscribeWithRegex(topic_pattern, subscriptionName, conf, consumer); Py_END_ALLOW_THREADS - CHECK_RESULT(res); + CHECK_RESULT(res); return consumer; } -Reader Client_createReader(Client& client, const std::string& topic, - const MessageId& startMessageId, +Reader Client_createReader(Client& client, const std::string& topic, const MessageId& startMessageId, const ReaderConfiguration& conf) { Reader reader; Result res; - Py_BEGIN_ALLOW_THREADS - res = client.createReader(topic, startMessageId, conf, reader); + Py_BEGIN_ALLOW_THREADS res = client.createReader(topic, startMessageId, conf, reader); Py_END_ALLOW_THREADS - CHECK_RESULT(res); + CHECK_RESULT(res); return reader; } @@ -94,11 +88,10 @@ boost::python::list Client_getTopicPartitions(Client& client, const std::string& std::vector<std::string> partitions; Result res; - Py_BEGIN_ALLOW_THREADS - res = client.getPartitionsForTopic(topic, partitions); + Py_BEGIN_ALLOW_THREADS res = client.getPartitionsForTopic(topic, partitions); Py_END_ALLOW_THREADS - CHECK_RESULT(res); + CHECK_RESULT(res); boost::python::list pyList; for (int i = 0; i < partitions.size(); i++) { @@ -111,24 +104,22 @@ boost::python::list Client_getTopicPartitions(Client& client, const std::string& void Client_close(Client& client) { Result res; - Py_BEGIN_ALLOW_THREADS - res = client.close(); + Py_BEGIN_ALLOW_THREADS res = client.close(); Py_END_ALLOW_THREADS - CHECK_RESULT(res); + CHECK_RESULT(res); } void export_client() { using namespace boost::python; - class_<Client>("Client", init<const std::string&, const ClientConfiguration& >()) - .def("create_producer", &Client_createProducer) - .def("subscribe", &Client_subscribe) - .def("subscribe_topics", &Client_subscribe_topics) - .def("subscribe_pattern", &Client_subscribe_pattern) - .def("create_reader", &Client_createReader) - .def("get_topic_partitions", &Client_getTopicPartitions) - .def("close", &Client_close) - .def("shutdown", &Client::shutdown) - ; + class_<Client>("Client", init<const std::string&, const ClientConfiguration&>()) + .def("create_producer", &Client_createProducer) + .def("subscribe", &Client_subscribe) + .def("subscribe_topics", &Client_subscribe_topics) + .def("subscribe_pattern", &Client_subscribe_pattern) + .def("create_reader", &Client_createReader) + .def("get_topic_partitions", &Client_getTopicPartitions) + .def("close", &Client_close) + .def("shutdown", &Client::shutdown); } diff --git a/pulsar-client-cpp/python/src/config.cc b/pulsar-client-cpp/python/src/config.cc index a287248..2dee1a1 100644 --- a/pulsar-client-cpp/python/src/config.cc +++ b/pulsar-client-cpp/python/src/config.cc @@ -21,14 +21,11 @@ #include "lib/Utils.h" #include <memory> -template<typename T> +template <typename T> struct ListenerWrapper { PyObject* _pyListener; - ListenerWrapper(py::object pyListener) : - _pyListener(pyListener.ptr()) { - Py_XINCREF(_pyListener); - } + ListenerWrapper(py::object pyListener) : _pyListener(pyListener.ptr()) { Py_XINCREF(_pyListener); } ListenerWrapper(const ListenerWrapper& other) { _pyListener = other._pyListener; @@ -41,9 +38,7 @@ struct ListenerWrapper { return *this; } - virtual ~ListenerWrapper() { - Py_XDECREF(_pyListener); - } + virtual ~ListenerWrapper() { Py_XDECREF(_pyListener); } void operator()(T consumer, const Message& msg) { PyGILState_STATE state = PyGILState_Ensure(); @@ -65,7 +60,7 @@ static ConsumerConfiguration& ConsumerConfiguration_setMessageListener(ConsumerC } static ReaderConfiguration& ReaderConfiguration_setReaderListener(ReaderConfiguration& conf, - py::object pyListener) { + py::object pyListener) { conf.setReaderListener(ListenerWrapper<Reader>(pyListener)); return conf; } @@ -78,41 +73,36 @@ static ClientConfiguration& ClientConfiguration_setAuthentication(ClientConfigur } static ConsumerConfiguration& ConsumerConfiguration_setCryptoKeyReader(ConsumerConfiguration& conf, - py::object cryptoKeyReader) { + py::object cryptoKeyReader) { CryptoKeyReaderWrapper cryptoKeyReaderWrapper = py::extract<CryptoKeyReaderWrapper>(cryptoKeyReader); conf.setCryptoKeyReader(cryptoKeyReaderWrapper.cryptoKeyReader); return conf; } static ProducerConfiguration& ProducerConfiguration_setCryptoKeyReader(ProducerConfiguration& conf, - py::object cryptoKeyReader) { + py::object cryptoKeyReader) { CryptoKeyReaderWrapper cryptoKeyReaderWrapper = py::extract<CryptoKeyReaderWrapper>(cryptoKeyReader); conf.setCryptoKeyReader(cryptoKeyReaderWrapper.cryptoKeyReader); return conf; } static ReaderConfiguration& ReaderConfiguration_setCryptoKeyReader(ReaderConfiguration& conf, - py::object cryptoKeyReader) { + py::object cryptoKeyReader) { CryptoKeyReaderWrapper cryptoKeyReaderWrapper = py::extract<CryptoKeyReaderWrapper>(cryptoKeyReader); conf.setCryptoKeyReader(cryptoKeyReaderWrapper.cryptoKeyReader); return conf; } -class LoggerWrapper: public Logger { +class LoggerWrapper : public Logger { PyObject* const _pyLogger; const int _pythonLogLevel; const std::unique_ptr<Logger> _fallbackLogger; - static constexpr int _getLogLevelValue(Level level) { - return 10 + (level * 10); - } + static constexpr int _getLogLevelValue(Level level) { return 10 + (level * 10); } public: - LoggerWrapper(PyObject* pyLogger, int pythonLogLevel, Logger* fallbackLogger) - : _pyLogger(pyLogger), - _pythonLogLevel(pythonLogLevel), - _fallbackLogger(fallbackLogger) { + : _pyLogger(pyLogger), _pythonLogLevel(pythonLogLevel), _fallbackLogger(fallbackLogger) { Py_XINCREF(_pyLogger); } @@ -121,13 +111,9 @@ class LoggerWrapper: public Logger { LoggerWrapper& operator=(const LoggerWrapper&) = delete; LoggerWrapper& operator=(LoggerWrapper&&) = delete; - virtual ~LoggerWrapper() { - Py_XDECREF(_pyLogger); - } + virtual ~LoggerWrapper() { Py_XDECREF(_pyLogger); } - bool isEnabled(Level level) { - return _getLogLevelValue(level) >= _pythonLogLevel; - } + bool isEnabled(Level level) { return _getLogLevelValue(level) >= _pythonLogLevel; } void log(Level level, int line, const std::string& message) { if (!Py_IsInitialized()) { @@ -187,11 +173,9 @@ class LoggerWrapperFactory : public LoggerFactory { initializePythonLogLevel(); } - virtual ~LoggerWrapperFactory() { - Py_XDECREF(_pyLogger); - } + virtual ~LoggerWrapperFactory() { Py_XDECREF(_pyLogger); } - Logger* getLogger(const std::string &fileName) { + Logger* getLogger(const std::string& fileName) { const auto fallbackLogger = _fallbackLoggerFactory->getLogger(fileName); if (_pythonLogLevel.is_present()) { return new LoggerWrapper(_pyLogger, _pythonLogLevel.value(), fallbackLogger); @@ -206,112 +190,128 @@ static ClientConfiguration& ClientConfiguration_setLogger(ClientConfiguration& c return conf; } - void export_config() { using namespace boost::python; class_<ClientConfiguration>("ClientConfiguration") - .def("authentication", &ClientConfiguration_setAuthentication, return_self<>()) - .def("operation_timeout_seconds", &ClientConfiguration::getOperationTimeoutSeconds) - .def("operation_timeout_seconds", &ClientConfiguration::setOperationTimeoutSeconds, return_self<>()) - .def("connection_timeout", &ClientConfiguration::getConnectionTimeout) - .def("connection_timeout", &ClientConfiguration::setConnectionTimeout, return_self<>()) - .def("io_threads", &ClientConfiguration::getIOThreads) - .def("io_threads", &ClientConfiguration::setIOThreads, return_self<>()) - .def("message_listener_threads", &ClientConfiguration::getMessageListenerThreads) - .def("message_listener_threads", &ClientConfiguration::setMessageListenerThreads, return_self<>()) - .def("concurrent_lookup_requests", &ClientConfiguration::getConcurrentLookupRequest) - .def("concurrent_lookup_requests", &ClientConfiguration::setConcurrentLookupRequest, return_self<>()) - .def("log_conf_file_path", &ClientConfiguration::getLogConfFilePath, return_value_policy<copy_const_reference>()) - .def("log_conf_file_path", &ClientConfiguration::setLogConfFilePath, return_self<>()) - .def("use_tls", &ClientConfiguration::isUseTls) - .def("use_tls", &ClientConfiguration::setUseTls, return_self<>()) - .def("tls_trust_certs_file_path", &ClientConfiguration::getTlsTrustCertsFilePath, return_value_policy<copy_const_reference>()) - .def("tls_trust_certs_file_path", &ClientConfiguration::setTlsTrustCertsFilePath, return_self<>()) - .def("tls_allow_insecure_connection", &ClientConfiguration::isTlsAllowInsecureConnection) - .def("tls_allow_insecure_connection", &ClientConfiguration::setTlsAllowInsecureConnection, return_self<>()) - .def("tls_validate_hostname", &ClientConfiguration::setValidateHostName, return_self<>()) - .def("set_logger", &ClientConfiguration_setLogger, return_self<>()) - ; + .def("authentication", &ClientConfiguration_setAuthentication, return_self<>()) + .def("operation_timeout_seconds", &ClientConfiguration::getOperationTimeoutSeconds) + .def("operation_timeout_seconds", &ClientConfiguration::setOperationTimeoutSeconds, return_self<>()) + .def("connection_timeout", &ClientConfiguration::getConnectionTimeout) + .def("connection_timeout", &ClientConfiguration::setConnectionTimeout, return_self<>()) + .def("io_threads", &ClientConfiguration::getIOThreads) + .def("io_threads", &ClientConfiguration::setIOThreads, return_self<>()) + .def("message_listener_threads", &ClientConfiguration::getMessageListenerThreads) + .def("message_listener_threads", &ClientConfiguration::setMessageListenerThreads, return_self<>()) + .def("concurrent_lookup_requests", &ClientConfiguration::getConcurrentLookupRequest) + .def("concurrent_lookup_requests", &ClientConfiguration::setConcurrentLookupRequest, return_self<>()) + .def("log_conf_file_path", &ClientConfiguration::getLogConfFilePath, + return_value_policy<copy_const_reference>()) + .def("log_conf_file_path", &ClientConfiguration::setLogConfFilePath, return_self<>()) + .def("use_tls", &ClientConfiguration::isUseTls) + .def("use_tls", &ClientConfiguration::setUseTls, return_self<>()) + .def("tls_trust_certs_file_path", &ClientConfiguration::getTlsTrustCertsFilePath, + return_value_policy<copy_const_reference>()) + .def("tls_trust_certs_file_path", &ClientConfiguration::setTlsTrustCertsFilePath, return_self<>()) + .def("tls_allow_insecure_connection", &ClientConfiguration::isTlsAllowInsecureConnection) + .def("tls_allow_insecure_connection", &ClientConfiguration::setTlsAllowInsecureConnection, + return_self<>()) + .def("tls_validate_hostname", &ClientConfiguration::setValidateHostName, return_self<>()) + .def("set_logger", &ClientConfiguration_setLogger, return_self<>()); class_<ProducerConfiguration>("ProducerConfiguration") - .def("producer_name", &ProducerConfiguration::getProducerName, return_value_policy<copy_const_reference>()) - .def("producer_name", &ProducerConfiguration::setProducerName, return_self<>()) - .def("schema", &ProducerConfiguration::getSchema, return_value_policy<copy_const_reference>()) - .def("schema", &ProducerConfiguration::setSchema, return_self<>()) - .def("send_timeout_millis", &ProducerConfiguration::getSendTimeout) - .def("send_timeout_millis", &ProducerConfiguration::setSendTimeout, return_self<>()) - .def("initial_sequence_id", &ProducerConfiguration::getInitialSequenceId) - .def("initial_sequence_id", &ProducerConfiguration::setInitialSequenceId, return_self<>()) - .def("compression_type", &ProducerConfiguration::getCompressionType) - .def("compression_type", &ProducerConfiguration::setCompressionType, return_self<>()) - .def("max_pending_messages", &ProducerConfiguration::getMaxPendingMessages) - .def("max_pending_messages", &ProducerConfiguration::setMaxPendingMessages, return_self<>()) - .def("max_pending_messages_across_partitions", &ProducerConfiguration::getMaxPendingMessagesAcrossPartitions) - .def("max_pending_messages_across_partitions", &ProducerConfiguration::setMaxPendingMessagesAcrossPartitions, return_self<>()) - .def("block_if_queue_full", &ProducerConfiguration::getBlockIfQueueFull) - .def("block_if_queue_full", &ProducerConfiguration::setBlockIfQueueFull, return_self<>()) - .def("partitions_routing_mode", &ProducerConfiguration::getPartitionsRoutingMode) - .def("partitions_routing_mode", &ProducerConfiguration::setPartitionsRoutingMode, return_self<>()) - .def("lazy_start_partitioned_producers", &ProducerConfiguration::getLazyStartPartitionedProducers) - .def("lazy_start_partitioned_producers", &ProducerConfiguration::setLazyStartPartitionedProducers, return_self<>()) - .def("batching_enabled", &ProducerConfiguration::getBatchingEnabled, return_value_policy<copy_const_reference>()) - .def("batching_enabled", &ProducerConfiguration::setBatchingEnabled, return_self<>()) - .def("batching_max_messages", &ProducerConfiguration::getBatchingMaxMessages, return_value_policy<copy_const_reference>()) - .def("batching_max_messages", &ProducerConfiguration::setBatchingMaxMessages, return_self<>()) - .def("batching_max_allowed_size_in_bytes", &ProducerConfiguration::getBatchingMaxAllowedSizeInBytes, return_value_policy<copy_const_reference>()) - .def("batching_max_allowed_size_in_bytes", &ProducerConfiguration::setBatchingMaxAllowedSizeInBytes, return_self<>()) - .def("batching_max_publish_delay_ms", &ProducerConfiguration::getBatchingMaxPublishDelayMs, return_value_policy<copy_const_reference>()) - .def("batching_max_publish_delay_ms", &ProducerConfiguration::setBatchingMaxPublishDelayMs, return_self<>()) - .def("property", &ProducerConfiguration::setProperty, return_self<>()) - .def("batching_type", &ProducerConfiguration::setBatchingType, return_self<>()) - .def("batching_type", &ProducerConfiguration::getBatchingType) - .def("encryption_key", &ProducerConfiguration::addEncryptionKey, return_self<>()) - .def("crypto_key_reader", &ProducerConfiguration_setCryptoKeyReader, return_self<>()) - ; + .def("producer_name", &ProducerConfiguration::getProducerName, + return_value_policy<copy_const_reference>()) + .def("producer_name", &ProducerConfiguration::setProducerName, return_self<>()) + .def("schema", &ProducerConfiguration::getSchema, return_value_policy<copy_const_reference>()) + .def("schema", &ProducerConfiguration::setSchema, return_self<>()) + .def("send_timeout_millis", &ProducerConfiguration::getSendTimeout) + .def("send_timeout_millis", &ProducerConfiguration::setSendTimeout, return_self<>()) + .def("initial_sequence_id", &ProducerConfiguration::getInitialSequenceId) + .def("initial_sequence_id", &ProducerConfiguration::setInitialSequenceId, return_self<>()) + .def("compression_type", &ProducerConfiguration::getCompressionType) + .def("compression_type", &ProducerConfiguration::setCompressionType, return_self<>()) + .def("max_pending_messages", &ProducerConfiguration::getMaxPendingMessages) + .def("max_pending_messages", &ProducerConfiguration::setMaxPendingMessages, return_self<>()) + .def("max_pending_messages_across_partitions", + &ProducerConfiguration::getMaxPendingMessagesAcrossPartitions) + .def("max_pending_messages_across_partitions", + &ProducerConfiguration::setMaxPendingMessagesAcrossPartitions, return_self<>()) + .def("block_if_queue_full", &ProducerConfiguration::getBlockIfQueueFull) + .def("block_if_queue_full", &ProducerConfiguration::setBlockIfQueueFull, return_self<>()) + .def("partitions_routing_mode", &ProducerConfiguration::getPartitionsRoutingMode) + .def("partitions_routing_mode", &ProducerConfiguration::setPartitionsRoutingMode, return_self<>()) + .def("lazy_start_partitioned_producers", &ProducerConfiguration::getLazyStartPartitionedProducers) + .def("lazy_start_partitioned_producers", &ProducerConfiguration::setLazyStartPartitionedProducers, + return_self<>()) + .def("batching_enabled", &ProducerConfiguration::getBatchingEnabled, + return_value_policy<copy_const_reference>()) + .def("batching_enabled", &ProducerConfiguration::setBatchingEnabled, return_self<>()) + .def("batching_max_messages", &ProducerConfiguration::getBatchingMaxMessages, + return_value_policy<copy_const_reference>()) + .def("batching_max_messages", &ProducerConfiguration::setBatchingMaxMessages, return_self<>()) + .def("batching_max_allowed_size_in_bytes", &ProducerConfiguration::getBatchingMaxAllowedSizeInBytes, + return_value_policy<copy_const_reference>()) + .def("batching_max_allowed_size_in_bytes", &ProducerConfiguration::setBatchingMaxAllowedSizeInBytes, + return_self<>()) + .def("batching_max_publish_delay_ms", &ProducerConfiguration::getBatchingMaxPublishDelayMs, + return_value_policy<copy_const_reference>()) + .def("batching_max_publish_delay_ms", &ProducerConfiguration::setBatchingMaxPublishDelayMs, + return_self<>()) + .def("property", &ProducerConfiguration::setProperty, return_self<>()) + .def("batching_type", &ProducerConfiguration::setBatchingType, return_self<>()) + .def("batching_type", &ProducerConfiguration::getBatchingType) + .def("encryption_key", &ProducerConfiguration::addEncryptionKey, return_self<>()) + .def("crypto_key_reader", &ProducerConfiguration_setCryptoKeyReader, return_self<>()); class_<ConsumerConfiguration>("ConsumerConfiguration") - .def("consumer_type", &ConsumerConfiguration::getConsumerType) - .def("consumer_type", &ConsumerConfiguration::setConsumerType, return_self<>()) - .def("schema", &ConsumerConfiguration::getSchema, return_value_policy<copy_const_reference>()) - .def("schema", &ConsumerConfiguration::setSchema, return_self<>()) - .def("message_listener", &ConsumerConfiguration_setMessageListener, return_self<>()) - .def("receiver_queue_size", &ConsumerConfiguration::getReceiverQueueSize) - .def("receiver_queue_size", &ConsumerConfiguration::setReceiverQueueSize) - .def("max_total_receiver_queue_size_across_partitions", &ConsumerConfiguration::getMaxTotalReceiverQueueSizeAcrossPartitions) - .def("max_total_receiver_queue_size_across_partitions", &ConsumerConfiguration::setMaxTotalReceiverQueueSizeAcrossPartitions) - .def("consumer_name", &ConsumerConfiguration::getConsumerName, return_value_policy<copy_const_reference>()) - .def("consumer_name", &ConsumerConfiguration::setConsumerName) - .def("unacked_messages_timeout_ms", &ConsumerConfiguration::getUnAckedMessagesTimeoutMs) - .def("unacked_messages_timeout_ms", &ConsumerConfiguration::setUnAckedMessagesTimeoutMs) - .def("negative_ack_redelivery_delay_ms", &ConsumerConfiguration::getNegativeAckRedeliveryDelayMs) - .def("negative_ack_redelivery_delay_ms", &ConsumerConfiguration::setNegativeAckRedeliveryDelayMs) - .def("broker_consumer_stats_cache_time_ms", &ConsumerConfiguration::getBrokerConsumerStatsCacheTimeInMs) - .def("broker_consumer_stats_cache_time_ms", &ConsumerConfiguration::setBrokerConsumerStatsCacheTimeInMs) - .def("pattern_auto_discovery_period", &ConsumerConfiguration::getPatternAutoDiscoveryPeriod) - .def("pattern_auto_discovery_period", &ConsumerConfiguration::setPatternAutoDiscoveryPeriod) - .def("read_compacted", &ConsumerConfiguration::isReadCompacted) - .def("read_compacted", &ConsumerConfiguration::setReadCompacted) - .def("property", &ConsumerConfiguration::setProperty, return_self<>()) - .def("subscription_initial_position", &ConsumerConfiguration::getSubscriptionInitialPosition) - .def("subscription_initial_position", &ConsumerConfiguration::setSubscriptionInitialPosition) - .def("crypto_key_reader", &ConsumerConfiguration_setCryptoKeyReader, return_self<>()) - .def("replicate_subscription_state_enabled", &ConsumerConfiguration::setReplicateSubscriptionStateEnabled) - .def("replicate_subscription_state_enabled", &ConsumerConfiguration::isReplicateSubscriptionStateEnabled) - ; + .def("consumer_type", &ConsumerConfiguration::getConsumerType) + .def("consumer_type", &ConsumerConfiguration::setConsumerType, return_self<>()) + .def("schema", &ConsumerConfiguration::getSchema, return_value_policy<copy_const_reference>()) + .def("schema", &ConsumerConfiguration::setSchema, return_self<>()) + .def("message_listener", &ConsumerConfiguration_setMessageListener, return_self<>()) + .def("receiver_queue_size", &ConsumerConfiguration::getReceiverQueueSize) + .def("receiver_queue_size", &ConsumerConfiguration::setReceiverQueueSize) + .def("max_total_receiver_queue_size_across_partitions", + &ConsumerConfiguration::getMaxTotalReceiverQueueSizeAcrossPartitions) + .def("max_total_receiver_queue_size_across_partitions", + &ConsumerConfiguration::setMaxTotalReceiverQueueSizeAcrossPartitions) + .def("consumer_name", &ConsumerConfiguration::getConsumerName, + return_value_policy<copy_const_reference>()) + .def("consumer_name", &ConsumerConfiguration::setConsumerName) + .def("unacked_messages_timeout_ms", &ConsumerConfiguration::getUnAckedMessagesTimeoutMs) + .def("unacked_messages_timeout_ms", &ConsumerConfiguration::setUnAckedMessagesTimeoutMs) + .def("negative_ack_redelivery_delay_ms", &ConsumerConfiguration::getNegativeAckRedeliveryDelayMs) + .def("negative_ack_redelivery_delay_ms", &ConsumerConfiguration::setNegativeAckRedeliveryDelayMs) + .def("broker_consumer_stats_cache_time_ms", + &ConsumerConfiguration::getBrokerConsumerStatsCacheTimeInMs) + .def("broker_consumer_stats_cache_time_ms", + &ConsumerConfiguration::setBrokerConsumerStatsCacheTimeInMs) + .def("pattern_auto_discovery_period", &ConsumerConfiguration::getPatternAutoDiscoveryPeriod) + .def("pattern_auto_discovery_period", &ConsumerConfiguration::setPatternAutoDiscoveryPeriod) + .def("read_compacted", &ConsumerConfiguration::isReadCompacted) + .def("read_compacted", &ConsumerConfiguration::setReadCompacted) + .def("property", &ConsumerConfiguration::setProperty, return_self<>()) + .def("subscription_initial_position", &ConsumerConfiguration::getSubscriptionInitialPosition) + .def("subscription_initial_position", &ConsumerConfiguration::setSubscriptionInitialPosition) + .def("crypto_key_reader", &ConsumerConfiguration_setCryptoKeyReader, return_self<>()) + .def("replicate_subscription_state_enabled", + &ConsumerConfiguration::setReplicateSubscriptionStateEnabled) + .def("replicate_subscription_state_enabled", + &ConsumerConfiguration::isReplicateSubscriptionStateEnabled); class_<ReaderConfiguration>("ReaderConfiguration") - .def("reader_listener", &ReaderConfiguration_setReaderListener, return_self<>()) - .def("schema", &ReaderConfiguration::getSchema, return_value_policy<copy_const_reference>()) - .def("schema", &ReaderConfiguration::setSchema, return_self<>()) - .def("receiver_queue_size", &ReaderConfiguration::getReceiverQueueSize) - .def("receiver_queue_size", &ReaderConfiguration::setReceiverQueueSize) - .def("reader_name", &ReaderConfiguration::getReaderName, return_value_policy<copy_const_reference>()) - .def("reader_name", &ReaderConfiguration::setReaderName) - .def("subscription_role_prefix", &ReaderConfiguration::getSubscriptionRolePrefix, return_value_policy<copy_const_reference>()) - .def("subscription_role_prefix", &ReaderConfiguration::setSubscriptionRolePrefix) - .def("read_compacted", &ReaderConfiguration::isReadCompacted) - .def("read_compacted", &ReaderConfiguration::setReadCompacted) - .def("crypto_key_reader", &ReaderConfiguration_setCryptoKeyReader, return_self<>()) - ; + .def("reader_listener", &ReaderConfiguration_setReaderListener, return_self<>()) + .def("schema", &ReaderConfiguration::getSchema, return_value_policy<copy_const_reference>()) + .def("schema", &ReaderConfiguration::setSchema, return_self<>()) + .def("receiver_queue_size", &ReaderConfiguration::getReceiverQueueSize) + .def("receiver_queue_size", &ReaderConfiguration::setReceiverQueueSize) + .def("reader_name", &ReaderConfiguration::getReaderName, return_value_policy<copy_const_reference>()) + .def("reader_name", &ReaderConfiguration::setReaderName) + .def("subscription_role_prefix", &ReaderConfiguration::getSubscriptionRolePrefix, + return_value_policy<copy_const_reference>()) + .def("subscription_role_prefix", &ReaderConfiguration::setSubscriptionRolePrefix) + .def("read_compacted", &ReaderConfiguration::isReadCompacted) + .def("read_compacted", &ReaderConfiguration::setReadCompacted) + .def("crypto_key_reader", &ReaderConfiguration_setCryptoKeyReader, return_self<>()); } diff --git a/pulsar-client-cpp/python/src/consumer.cc b/pulsar-client-cpp/python/src/consumer.cc index 815282d..28bedad 100644 --- a/pulsar-client-cpp/python/src/consumer.cc +++ b/pulsar-client-cpp/python/src/consumer.cc @@ -20,11 +20,10 @@ void Consumer_unsubscribe(Consumer& consumer) { Result res; - Py_BEGIN_ALLOW_THREADS - res = consumer.unsubscribe(); + Py_BEGIN_ALLOW_THREADS res = consumer.unsubscribe(); Py_END_ALLOW_THREADS - CHECK_RESULT(res); + CHECK_RESULT(res); } Message Consumer_receive(Consumer& consumer) { @@ -32,11 +31,10 @@ Message Consumer_receive(Consumer& consumer) { Result res; while (true) { - Py_BEGIN_ALLOW_THREADS - res = consumer.receive(msg); + Py_BEGIN_ALLOW_THREADS res = consumer.receive(msg); Py_END_ALLOW_THREADS - if (res != ResultTimeout) { + if (res != ResultTimeout) { // In case of timeout we keep calling receive() to simulate a // blocking call until a message is available, while breaking // every once in a while to check the Python signal status @@ -56,17 +54,14 @@ Message Consumer_receive(Consumer& consumer) { Message Consumer_receive_timeout(Consumer& consumer, int timeoutMs) { Message msg; Result res; - Py_BEGIN_ALLOW_THREADS - res = consumer.receive(msg, timeoutMs); + Py_BEGIN_ALLOW_THREADS res = consumer.receive(msg, timeoutMs); Py_END_ALLOW_THREADS - CHECK_RESULT(res); + CHECK_RESULT(res); return msg; } -void Consumer_acknowledge(Consumer& consumer, const Message& msg) { - consumer.acknowledgeAsync(msg, nullptr); -} +void Consumer_acknowledge(Consumer& consumer, const Message& msg) { consumer.acknowledgeAsync(msg, nullptr); } void Consumer_acknowledge_message_id(Consumer& consumer, const MessageId& msgId) { consumer.acknowledgeAsync(msgId, nullptr); @@ -77,7 +72,7 @@ void Consumer_negative_acknowledge(Consumer& consumer, const Message& msg) { } void Consumer_negative_acknowledge_message_id(Consumer& consumer, const MessageId& msgId) { - consumer.negativeAcknowledge(msgId); + consumer.negativeAcknowledge(msgId); } void Consumer_acknowledge_cumulative(Consumer& consumer, const Message& msg) { @@ -90,60 +85,52 @@ void Consumer_acknowledge_cumulative_message_id(Consumer& consumer, const Messag void Consumer_close(Consumer& consumer) { Result res; - Py_BEGIN_ALLOW_THREADS - res = consumer.close(); + Py_BEGIN_ALLOW_THREADS res = consumer.close(); Py_END_ALLOW_THREADS - CHECK_RESULT(res); + CHECK_RESULT(res); } -void Consumer_pauseMessageListener(Consumer& consumer) { - CHECK_RESULT(consumer.pauseMessageListener()); -} +void Consumer_pauseMessageListener(Consumer& consumer) { CHECK_RESULT(consumer.pauseMessageListener()); } -void Consumer_resumeMessageListener(Consumer& consumer) { - CHECK_RESULT(consumer.resumeMessageListener()); -} +void Consumer_resumeMessageListener(Consumer& consumer) { CHECK_RESULT(consumer.resumeMessageListener()); } void Consumer_seek(Consumer& consumer, const MessageId& msgId) { Result res; - Py_BEGIN_ALLOW_THREADS - res = consumer.seek(msgId); + Py_BEGIN_ALLOW_THREADS res = consumer.seek(msgId); Py_END_ALLOW_THREADS - CHECK_RESULT(res); + CHECK_RESULT(res); } void Consumer_seek_timestamp(Consumer& consumer, uint64_t timestamp) { Result res; - Py_BEGIN_ALLOW_THREADS - res = consumer.seek(timestamp); + Py_BEGIN_ALLOW_THREADS res = consumer.seek(timestamp); Py_END_ALLOW_THREADS - CHECK_RESULT(res); + CHECK_RESULT(res); } void export_consumer() { using namespace boost::python; class_<Consumer>("Consumer", no_init) - .def("topic", &Consumer::getTopic, "return the topic this consumer is subscribed to", - return_value_policy<copy_const_reference>()) - .def("subscription_name", &Consumer::getSubscriptionName, return_value_policy<copy_const_reference>()) - .def("unsubscribe", &Consumer_unsubscribe) - .def("receive", &Consumer_receive) - .def("receive", &Consumer_receive_timeout) - .def("acknowledge", &Consumer_acknowledge) - .def("acknowledge", &Consumer_acknowledge_message_id) - .def("acknowledge_cumulative", &Consumer_acknowledge_cumulative) - .def("acknowledge_cumulative", &Consumer_acknowledge_cumulative_message_id) - .def("negative_acknowledge", &Consumer_negative_acknowledge) - .def("negative_acknowledge", &Consumer_negative_acknowledge_message_id) - .def("close", &Consumer_close) - .def("pause_message_listener", &Consumer_pauseMessageListener) - .def("resume_message_listener", &Consumer_resumeMessageListener) - .def("redeliver_unacknowledged_messages", &Consumer::redeliverUnacknowledgedMessages) - .def("seek", &Consumer_seek) - .def("seek", &Consumer_seek_timestamp) - ; + .def("topic", &Consumer::getTopic, "return the topic this consumer is subscribed to", + return_value_policy<copy_const_reference>()) + .def("subscription_name", &Consumer::getSubscriptionName, return_value_policy<copy_const_reference>()) + .def("unsubscribe", &Consumer_unsubscribe) + .def("receive", &Consumer_receive) + .def("receive", &Consumer_receive_timeout) + .def("acknowledge", &Consumer_acknowledge) + .def("acknowledge", &Consumer_acknowledge_message_id) + .def("acknowledge_cumulative", &Consumer_acknowledge_cumulative) + .def("acknowledge_cumulative", &Consumer_acknowledge_cumulative_message_id) + .def("negative_acknowledge", &Consumer_negative_acknowledge) + .def("negative_acknowledge", &Consumer_negative_acknowledge_message_id) + .def("close", &Consumer_close) + .def("pause_message_listener", &Consumer_pauseMessageListener) + .def("resume_message_listener", &Consumer_resumeMessageListener) + .def("redeliver_unacknowledged_messages", &Consumer::redeliverUnacknowledgedMessages) + .def("seek", &Consumer_seek) + .def("seek", &Consumer_seek_timestamp); } diff --git a/pulsar-client-cpp/python/src/cryptoKeyReader.cc b/pulsar-client-cpp/python/src/cryptoKeyReader.cc index ccefe6f..2c46b6f 100644 --- a/pulsar-client-cpp/python/src/cryptoKeyReader.cc +++ b/pulsar-client-cpp/python/src/cryptoKeyReader.cc @@ -21,7 +21,7 @@ CryptoKeyReaderWrapper::CryptoKeyReaderWrapper() {} CryptoKeyReaderWrapper::CryptoKeyReaderWrapper(const std::string& publicKeyPath, - const std::string& privateKeyPath) { + const std::string& privateKeyPath) { this->cryptoKeyReader = DefaultCryptoKeyReader::create(publicKeyPath, privateKeyPath); } diff --git a/pulsar-client-cpp/python/src/enums.cc b/pulsar-client-cpp/python/src/enums.cc index c23b211..1b21af5 100644 --- a/pulsar-client-cpp/python/src/enums.cc +++ b/pulsar-client-cpp/python/src/enums.cc @@ -18,104 +18,96 @@ */ #include "utils.h" - void export_enums() { using namespace boost::python; enum_<ProducerConfiguration::PartitionsRoutingMode>("PartitionsRoutingMode") - .value("UseSinglePartition", ProducerConfiguration::UseSinglePartition) - .value("RoundRobinDistribution", ProducerConfiguration::RoundRobinDistribution) - .value("CustomPartition", ProducerConfiguration::CustomPartition) - ; + .value("UseSinglePartition", ProducerConfiguration::UseSinglePartition) + .value("RoundRobinDistribution", ProducerConfiguration::RoundRobinDistribution) + .value("CustomPartition", ProducerConfiguration::CustomPartition); enum_<CompressionType>("CompressionType") - .value("NONE", CompressionNone) // Don't use 'None' since it's a keyword in py3 - .value("LZ4", CompressionLZ4) - .value("ZLib", CompressionZLib) - .value("ZSTD", CompressionZSTD) - .value("SNAPPY", CompressionSNAPPY) - ; + .value("NONE", CompressionNone) // Don't use 'None' since it's a keyword in py3 + .value("LZ4", CompressionLZ4) + .value("ZLib", CompressionZLib) + .value("ZSTD", CompressionZSTD) + .value("SNAPPY", CompressionSNAPPY); enum_<ConsumerType>("ConsumerType") - .value("Exclusive", ConsumerExclusive) - .value("Shared", ConsumerShared) - .value("Failover", ConsumerFailover) - .value("KeyShared", ConsumerKeyShared) - ; + .value("Exclusive", ConsumerExclusive) + .value("Shared", ConsumerShared) + .value("Failover", ConsumerFailover) + .value("KeyShared", ConsumerKeyShared); - enum_<Result >("Result", "Collection of return codes") - .value("Ok", ResultOk) - .value("UnknownError", ResultUnknownError) - .value("InvalidConfiguration", ResultInvalidConfiguration) - .value("Timeout", ResultTimeout) - .value("LookupError", ResultLookupError) - .value("ConnectError", ResultConnectError) - .value("ReadError", ResultReadError) - .value("AuthenticationError", ResultAuthenticationError) - .value("AuthorizationError", ResultAuthorizationError) - .value("ErrorGettingAuthenticationData", ResultErrorGettingAuthenticationData) - .value("BrokerMetadataError", ResultBrokerMetadataError) - .value("BrokerPersistenceError", ResultBrokerPersistenceError) - .value("ChecksumError", ResultChecksumError) - .value("ConsumerBusy", ResultConsumerBusy) - .value("NotConnected", ResultNotConnected) - .value("AlreadyClosed", ResultAlreadyClosed) - .value("InvalidMessage", ResultInvalidMessage) - .value("ConsumerNotInitialized", ResultConsumerNotInitialized) - .value("ProducerNotInitialized", ResultProducerNotInitialized) - .value("ProducerBusy", ResultProducerBusy) - .value("TooManyLookupRequestException", ResultTooManyLookupRequestException) - .value("InvalidTopicName", ResultInvalidTopicName) - .value("InvalidUrl", ResultInvalidUrl) - .value("ServiceUnitNotReady", ResultServiceUnitNotReady) - .value("OperationNotSupported", ResultOperationNotSupported) - .value("ProducerBlockedQuotaExceededError", ResultProducerBlockedQuotaExceededError) - .value("ProducerBlockedQuotaExceededException", ResultProducerBlockedQuotaExceededException) - .value("ProducerQueueIsFull", ResultProducerQueueIsFull) - .value("MessageTooBig", ResultMessageTooBig) - .value("TopicNotFound", ResultTopicNotFound) - .value("SubscriptionNotFound", ResultSubscriptionNotFound) - .value("ConsumerNotFound", ResultConsumerNotFound) - .value("UnsupportedVersionError", ResultUnsupportedVersionError) - .value("TopicTerminated", ResultTopicTerminated) - .value("CryptoError", ResultCryptoError) - .value("IncompatibleSchema", ResultIncompatibleSchema) - .value("ConsumerAssignError", ResultConsumerAssignError) - .value("CumulativeAcknowledgementNotAllowedError", ResultCumulativeAcknowledgementNotAllowedError) - .value("TransactionCoordinatorNotFoundError", ResultTransactionCoordinatorNotFoundError) - .value("InvalidTxnStatusError", ResultInvalidTxnStatusError) - .value("NotAllowedError", ResultNotAllowedError) - .value("TransactionConflict", ResultTransactionConflict) - .value("TransactionNotFound", ResultTransactionNotFound) - .value("ProducerFenced", ResultProducerFenced) - .value("MemoryBufferIsFull", ResultMemoryBufferIsFull) - ; + enum_<Result>("Result", "Collection of return codes") + .value("Ok", ResultOk) + .value("UnknownError", ResultUnknownError) + .value("InvalidConfiguration", ResultInvalidConfiguration) + .value("Timeout", ResultTimeout) + .value("LookupError", ResultLookupError) + .value("ConnectError", ResultConnectError) + .value("ReadError", ResultReadError) + .value("AuthenticationError", ResultAuthenticationError) + .value("AuthorizationError", ResultAuthorizationError) + .value("ErrorGettingAuthenticationData", ResultErrorGettingAuthenticationData) + .value("BrokerMetadataError", ResultBrokerMetadataError) + .value("BrokerPersistenceError", ResultBrokerPersistenceError) + .value("ChecksumError", ResultChecksumError) + .value("ConsumerBusy", ResultConsumerBusy) + .value("NotConnected", ResultNotConnected) + .value("AlreadyClosed", ResultAlreadyClosed) + .value("InvalidMessage", ResultInvalidMessage) + .value("ConsumerNotInitialized", ResultConsumerNotInitialized) + .value("ProducerNotInitialized", ResultProducerNotInitialized) + .value("ProducerBusy", ResultProducerBusy) + .value("TooManyLookupRequestException", ResultTooManyLookupRequestException) + .value("InvalidTopicName", ResultInvalidTopicName) + .value("InvalidUrl", ResultInvalidUrl) + .value("ServiceUnitNotReady", ResultServiceUnitNotReady) + .value("OperationNotSupported", ResultOperationNotSupported) + .value("ProducerBlockedQuotaExceededError", ResultProducerBlockedQuotaExceededError) + .value("ProducerBlockedQuotaExceededException", ResultProducerBlockedQuotaExceededException) + .value("ProducerQueueIsFull", ResultProducerQueueIsFull) + .value("MessageTooBig", ResultMessageTooBig) + .value("TopicNotFound", ResultTopicNotFound) + .value("SubscriptionNotFound", ResultSubscriptionNotFound) + .value("ConsumerNotFound", ResultConsumerNotFound) + .value("UnsupportedVersionError", ResultUnsupportedVersionError) + .value("TopicTerminated", ResultTopicTerminated) + .value("CryptoError", ResultCryptoError) + .value("IncompatibleSchema", ResultIncompatibleSchema) + .value("ConsumerAssignError", ResultConsumerAssignError) + .value("CumulativeAcknowledgementNotAllowedError", ResultCumulativeAcknowledgementNotAllowedError) + .value("TransactionCoordinatorNotFoundError", ResultTransactionCoordinatorNotFoundError) + .value("InvalidTxnStatusError", ResultInvalidTxnStatusError) + .value("NotAllowedError", ResultNotAllowedError) + .value("TransactionConflict", ResultTransactionConflict) + .value("TransactionNotFound", ResultTransactionNotFound) + .value("ProducerFenced", ResultProducerFenced) + .value("MemoryBufferIsFull", ResultMemoryBufferIsFull); enum_<SchemaType>("SchemaType", "Supported schema types") - .value("NONE", pulsar::NONE) - .value("STRING", pulsar::STRING) - .value("INT8", pulsar::INT8) - .value("INT16", pulsar::INT16) - .value("INT32", pulsar::INT32) - .value("INT64", pulsar::INT64) - .value("FLOAT", pulsar::FLOAT) - .value("DOUBLE", pulsar::DOUBLE) - .value("BYTES", pulsar::BYTES) - .value("JSON", pulsar::JSON) - .value("PROTOBUF", pulsar::PROTOBUF) - .value("AVRO", pulsar::AVRO) - .value("AUTO_CONSUME", pulsar::AUTO_CONSUME) - .value("AUTO_PUBLISH", pulsar::AUTO_PUBLISH) - .value("KEY_VALUE", pulsar::KEY_VALUE) - ; + .value("NONE", pulsar::NONE) + .value("STRING", pulsar::STRING) + .value("INT8", pulsar::INT8) + .value("INT16", pulsar::INT16) + .value("INT32", pulsar::INT32) + .value("INT64", pulsar::INT64) + .value("FLOAT", pulsar::FLOAT) + .value("DOUBLE", pulsar::DOUBLE) + .value("BYTES", pulsar::BYTES) + .value("JSON", pulsar::JSON) + .value("PROTOBUF", pulsar::PROTOBUF) + .value("AVRO", pulsar::AVRO) + .value("AUTO_CONSUME", pulsar::AUTO_CONSUME) + .value("AUTO_PUBLISH", pulsar::AUTO_PUBLISH) + .value("KEY_VALUE", pulsar::KEY_VALUE); enum_<InitialPosition>("InitialPosition", "Supported initial position") - .value("Latest", InitialPositionLatest) - .value("Earliest", InitialPositionEarliest) - ; + .value("Latest", InitialPositionLatest) + .value("Earliest", InitialPositionEarliest); enum_<ProducerConfiguration::BatchingType>("BatchingType", "Supported batching types") - .value("Default", ProducerConfiguration::DefaultBatching) - .value("KeyBased", ProducerConfiguration::KeyBasedBatching) - ; + .value("Default", ProducerConfiguration::DefaultBatching) + .value("KeyBased", ProducerConfiguration::KeyBasedBatching); } diff --git a/pulsar-client-cpp/python/src/exceptions.cc b/pulsar-client-cpp/python/src/exceptions.cc index c39b52d..25b3bd0 100644 --- a/pulsar-client-cpp/python/src/exceptions.cc +++ b/pulsar-client-cpp/python/src/exceptions.cc @@ -29,16 +29,13 @@ PyObject* createExceptionClass(const char* name, PyObject* baseTypeObj = PyExc_E std::string fullName = "_pulsar."; fullName += name; - PyObject* typeObj = PyErr_NewException(const_cast<char*>(fullName.c_str()), - baseTypeObj, nullptr); + PyObject* typeObj = PyErr_NewException(const_cast<char*>(fullName.c_str()), baseTypeObj, nullptr); if (!typeObj) throw_error_already_set(); scope().attr(name) = handle<>(borrowed(typeObj)); return typeObj; } -PyObject* get_exception_class(Result result) { - return exceptions[result]; -} +PyObject* get_exception_class(Result result) { return exceptions[result]; } void export_exceptions() { using namespace boost::python; @@ -46,44 +43,58 @@ void export_exceptions() { basePulsarException = createExceptionClass("PulsarException"); exceptions[ResultUnknownError] = createExceptionClass("UnknownError", basePulsarException); - exceptions[ResultInvalidConfiguration] = createExceptionClass("InvalidConfiguration", basePulsarException); + exceptions[ResultInvalidConfiguration] = + createExceptionClass("InvalidConfiguration", basePulsarException); exceptions[ResultTimeout] = createExceptionClass("Timeout", basePulsarException); exceptions[ResultLookupError] = createExceptionClass("LookupError", basePulsarException); exceptions[ResultConnectError] = createExceptionClass("ConnectError", basePulsarException); exceptions[ResultReadError] = createExceptionClass("ReadError", basePulsarException); exceptions[ResultAuthenticationError] = createExceptionClass("AuthenticationError", basePulsarException); exceptions[ResultAuthorizationError] = createExceptionClass("AuthorizationError", basePulsarException); - exceptions[ResultErrorGettingAuthenticationData] = createExceptionClass("ErrorGettingAuthenticationData", basePulsarException); + exceptions[ResultErrorGettingAuthenticationData] = + createExceptionClass("ErrorGettingAuthenticationData", basePulsarException); exceptions[ResultBrokerMetadataError] = createExceptionClass("BrokerMetadataError", basePulsarException); - exceptions[ResultBrokerPersistenceError] = createExceptionClass("BrokerPersistenceError", basePulsarException); + exceptions[ResultBrokerPersistenceError] = + createExceptionClass("BrokerPersistenceError", basePulsarException); exceptions[ResultChecksumError] = createExceptionClass("ChecksumError", basePulsarException); exceptions[ResultConsumerBusy] = createExceptionClass("ConsumerBusy", basePulsarException); exceptions[ResultNotConnected] = createExceptionClass("NotConnected", basePulsarException); exceptions[ResultAlreadyClosed] = createExceptionClass("AlreadyClosed", basePulsarException); exceptions[ResultInvalidMessage] = createExceptionClass("InvalidMessage", basePulsarException); - exceptions[ResultConsumerNotInitialized] = createExceptionClass("ConsumerNotInitialized", basePulsarException); - exceptions[ResultProducerNotInitialized] = createExceptionClass("ProducerNotInitialized", basePulsarException); + exceptions[ResultConsumerNotInitialized] = + createExceptionClass("ConsumerNotInitialized", basePulsarException); + exceptions[ResultProducerNotInitialized] = + createExceptionClass("ProducerNotInitialized", basePulsarException); exceptions[ResultProducerBusy] = createExceptionClass("ProducerBusy", basePulsarException); - exceptions[ResultTooManyLookupRequestException] = createExceptionClass("TooManyLookupRequestException", basePulsarException); + exceptions[ResultTooManyLookupRequestException] = + createExceptionClass("TooManyLookupRequestException", basePulsarException); exceptions[ResultInvalidTopicName] = createExceptionClass("InvalidTopicName", basePulsarException); exceptions[ResultInvalidUrl] = createExceptionClass("InvalidUrl", basePulsarException); exceptions[ResultServiceUnitNotReady] = createExceptionClass("ServiceUnitNotReady", basePulsarException); - exceptions[ResultOperationNotSupported] = createExceptionClass("OperationNotSupported", basePulsarException); - exceptions[ResultProducerBlockedQuotaExceededError] = createExceptionClass("ProducerBlockedQuotaExceededError", basePulsarException); - exceptions[ResultProducerBlockedQuotaExceededException] = createExceptionClass("ProducerBlockedQuotaExceededException", basePulsarException); + exceptions[ResultOperationNotSupported] = + createExceptionClass("OperationNotSupported", basePulsarException); + exceptions[ResultProducerBlockedQuotaExceededError] = + createExceptionClass("ProducerBlockedQuotaExceededError", basePulsarException); + exceptions[ResultProducerBlockedQuotaExceededException] = + createExceptionClass("ProducerBlockedQuotaExceededException", basePulsarException); exceptions[ResultProducerQueueIsFull] = createExceptionClass("ProducerQueueIsFull", basePulsarException); exceptions[ResultMessageTooBig] = createExceptionClass("MessageTooBig", basePulsarException); exceptions[ResultTopicNotFound] = createExceptionClass("TopicNotFound", basePulsarException); - exceptions[ResultSubscriptionNotFound] = createExceptionClass("SubscriptionNotFound", basePulsarException); + exceptions[ResultSubscriptionNotFound] = + createExceptionClass("SubscriptionNotFound", basePulsarException); exceptions[ResultConsumerNotFound] = createExceptionClass("ConsumerNotFound", basePulsarException); - exceptions[ResultUnsupportedVersionError] = createExceptionClass("UnsupportedVersionError", basePulsarException); + exceptions[ResultUnsupportedVersionError] = + createExceptionClass("UnsupportedVersionError", basePulsarException); exceptions[ResultTopicTerminated] = createExceptionClass("TopicTerminated", basePulsarException); exceptions[ResultCryptoError] = createExceptionClass("CryptoError", basePulsarException); exceptions[ResultIncompatibleSchema] = createExceptionClass("IncompatibleSchema", basePulsarException); exceptions[ResultConsumerAssignError] = createExceptionClass("ConsumerAssignError", basePulsarException); - exceptions[ResultCumulativeAcknowledgementNotAllowedError] = createExceptionClass("CumulativeAcknowledgementNotAllowedError", basePulsarException); - exceptions[ResultTransactionCoordinatorNotFoundError] = createExceptionClass("TransactionCoordinatorNotFoundError", basePulsarException); - exceptions[ResultInvalidTxnStatusError] = createExceptionClass("InvalidTxnStatusError", basePulsarException); + exceptions[ResultCumulativeAcknowledgementNotAllowedError] = + createExceptionClass("CumulativeAcknowledgementNotAllowedError", basePulsarException); + exceptions[ResultTransactionCoordinatorNotFoundError] = + createExceptionClass("TransactionCoordinatorNotFoundError", basePulsarException); + exceptions[ResultInvalidTxnStatusError] = + createExceptionClass("InvalidTxnStatusError", basePulsarException); exceptions[ResultNotAllowedError] = createExceptionClass("NotAllowedError", basePulsarException); exceptions[ResultTransactionConflict] = createExceptionClass("TransactionConflict", basePulsarException); exceptions[ResultTransactionNotFound] = createExceptionClass("TransactionNotFound", basePulsarException); diff --git a/pulsar-client-cpp/python/src/message.cc b/pulsar-client-cpp/python/src/message.cc index 8532966..b93380b 100644 --- a/pulsar-client-cpp/python/src/message.cc +++ b/pulsar-client-cpp/python/src/message.cc @@ -28,34 +28,23 @@ std::string MessageId_str(const MessageId& msgId) { return ss.str(); } -bool MessageId_eq(const MessageId& a, const MessageId& b) { - return a == b; -} +bool MessageId_eq(const MessageId& a, const MessageId& b) { return a == b; } -bool MessageId_ne(const MessageId& a, const MessageId& b) { - return a != b; -} +bool MessageId_ne(const MessageId& a, const MessageId& b) { return a != b; } -bool MessageId_lt(const MessageId& a, const MessageId& b) { - return a < b; -} +bool MessageId_lt(const MessageId& a, const MessageId& b) { return a < b; } -bool MessageId_le(const MessageId& a, const MessageId& b) { - return a <= b; -} +bool MessageId_le(const MessageId& a, const MessageId& b) { return a <= b; } -bool MessageId_gt(const MessageId& a, const MessageId& b) { - return a > b; -} +bool MessageId_gt(const MessageId& a, const MessageId& b) { return a > b; } -bool MessageId_ge(const MessageId& a, const MessageId& b) { - return a >= b; -} +bool MessageId_ge(const MessageId& a, const MessageId& b) { return a >= b; } boost::python::object MessageId_serialize(const MessageId& msgId) { std::string serialized; msgId.serialize(serialized); - return boost::python::object(boost::python::handle<>(PyBytes_FromStringAndSize(serialized.c_str(), serialized.length()))); + return boost::python::object( + boost::python::handle<>(PyBytes_FromStringAndSize(serialized.c_str(), serialized.length()))); } std::string Message_str(const Message& msg) { @@ -65,7 +54,8 @@ std::string Message_str(const Message& msg) { } boost::python::object Message_data(const Message& msg) { - return boost::python::object(boost::python::handle<>(PyBytes_FromStringAndSize((const char*)msg.getData(), msg.getLength()))); + return boost::python::object( + boost::python::handle<>(PyBytes_FromStringAndSize((const char*)msg.getData(), msg.getLength()))); } boost::python::object Message_properties(const Message& msg) { @@ -88,9 +78,7 @@ std::string schema_version_str(const Message& msg) { return ss.str(); } -const MessageId& Message_getMessageId(const Message& msg) { - return msg.getMessageId(); -} +const MessageId& Message_getMessageId(const Message& msg) { return msg.getMessageId(); } void deliverAfter(MessageBuilder* const builder, PyObject* obj_delta) { PyDateTime_Delta const* pydelta = reinterpret_cast<PyDateTime_Delta*>(obj_delta); @@ -102,12 +90,9 @@ void deliverAfter(MessageBuilder* const builder, PyObject* obj_delta) { } // Create chrono duration object - std::chrono::milliseconds - duration = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::hours(24)*days - + std::chrono::seconds(pydelta->seconds) - + std::chrono::microseconds(pydelta->microseconds) - ); + std::chrono::milliseconds duration = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::hours(24) * days + std::chrono::seconds(pydelta->seconds) + + std::chrono::microseconds(pydelta->microseconds)); if (is_negative) { duration = duration * -1; @@ -121,70 +106,66 @@ void export_message() { PyDateTime_IMPORT; - MessageBuilder& (MessageBuilder::*MessageBuilderSetContentString)(const std::string&) = &MessageBuilder::setContent; + MessageBuilder& (MessageBuilder::*MessageBuilderSetContentString)(const std::string&) = + &MessageBuilder::setContent; class_<MessageBuilder, boost::noncopyable>("MessageBuilder") - .def("content", MessageBuilderSetContentString, return_self<>()) - .def("property", &MessageBuilder::setProperty, return_self<>()) - .def("properties", &MessageBuilder::setProperties, return_self<>()) - .def("sequence_id", &MessageBuilder::setSequenceId, return_self<>()) - .def("deliver_after", &deliverAfter, return_self<>()) - .def("deliver_at", &MessageBuilder::setDeliverAt, return_self<>()) - .def("partition_key", &MessageBuilder::setPartitionKey, return_self<>()) - .def("event_timestamp", &MessageBuilder::setEventTimestamp, return_self<>()) - .def("replication_clusters", &MessageBuilder::setReplicationClusters, return_self<>()) - .def("disable_replication", &MessageBuilder::disableReplication, return_self<>()) - .def("build", &MessageBuilder::build) - ; - - class_<Message::StringMap>("MessageStringMap") - .def(map_indexing_suite<Message::StringMap>()) - ; + .def("content", MessageBuilderSetContentString, return_self<>()) + .def("property", &MessageBuilder::setProperty, return_self<>()) + .def("properties", &MessageBuilder::setProperties, return_self<>()) + .def("sequence_id", &MessageBuilder::setSequenceId, return_self<>()) + .def("deliver_after", &deliverAfter, return_self<>()) + .def("deliver_at", &MessageBuilder::setDeliverAt, return_self<>()) + .def("partition_key", &MessageBuilder::setPartitionKey, return_self<>()) + .def("event_timestamp", &MessageBuilder::setEventTimestamp, return_self<>()) + .def("replication_clusters", &MessageBuilder::setReplicationClusters, return_self<>()) + .def("disable_replication", &MessageBuilder::disableReplication, return_self<>()) + .def("build", &MessageBuilder::build); + + class_<Message::StringMap>("MessageStringMap").def(map_indexing_suite<Message::StringMap>()); static const MessageId& _MessageId_earliest = MessageId::earliest(); static const MessageId& _MessageId_latest = MessageId::latest(); class_<MessageId>("MessageId") - .def(init<int32_t, int64_t, int64_t, int32_t>()) - .def("__str__", &MessageId_str) - .def("__eq__", &MessageId_eq) - .def("__ne__", &MessageId_ne) - .def("__le__", &MessageId_le) - .def("__lt__", &MessageId_lt) - .def("__ge__", &MessageId_ge) - .def("__gt__", &MessageId_gt) - .def("ledger_id", &MessageId::ledgerId) - .def("entry_id", &MessageId::entryId) - .def("batch_index", &MessageId::batchIndex) - .def("partition", &MessageId::partition) - .add_static_property("earliest", make_getter(&_MessageId_earliest)) - .add_static_property("latest", make_getter(&_MessageId_latest)) - .def("serialize", &MessageId_serialize) - .def("deserialize", &MessageId::deserialize).staticmethod("deserialize") - ; + .def(init<int32_t, int64_t, int64_t, int32_t>()) + .def("__str__", &MessageId_str) + .def("__eq__", &MessageId_eq) + .def("__ne__", &MessageId_ne) + .def("__le__", &MessageId_le) + .def("__lt__", &MessageId_lt) + .def("__ge__", &MessageId_ge) + .def("__gt__", &MessageId_gt) + .def("ledger_id", &MessageId::ledgerId) + .def("entry_id", &MessageId::entryId) + .def("batch_index", &MessageId::batchIndex) + .def("partition", &MessageId::partition) + .add_static_property("earliest", make_getter(&_MessageId_earliest)) + .add_static_property("latest", make_getter(&_MessageId_latest)) + .def("serialize", &MessageId_serialize) + .def("deserialize", &MessageId::deserialize) + .staticmethod("deserialize"); class_<Message>("Message") - .def("properties", &Message_properties) - .def("data", &Message_data) - .def("length", &Message::getLength) - .def("partition_key", &Message::getPartitionKey, return_value_policy<copy_const_reference>()) - .def("publish_timestamp", &Message::getPublishTimestamp) - .def("event_timestamp", &Message::getEventTimestamp) - .def("message_id", &Message_getMessageId, return_value_policy<copy_const_reference>()) - .def("__str__", &Message_str) - .def("topic_name", &Topic_name_str) - .def("redelivery_count", &Message::getRedeliveryCount) - .def("schema_version", &schema_version_str) - ; - - MessageBatch& (MessageBatch::*MessageBatchParseFromString)(const std::string& payload, uint32_t batchSize) = &MessageBatch::parseFrom; + .def("properties", &Message_properties) + .def("data", &Message_data) + .def("length", &Message::getLength) + .def("partition_key", &Message::getPartitionKey, return_value_policy<copy_const_reference>()) + .def("publish_timestamp", &Message::getPublishTimestamp) + .def("event_timestamp", &Message::getEventTimestamp) + .def("message_id", &Message_getMessageId, return_value_policy<copy_const_reference>()) + .def("__str__", &Message_str) + .def("topic_name", &Topic_name_str) + .def("redelivery_count", &Message::getRedeliveryCount) + .def("schema_version", &schema_version_str); + + MessageBatch& (MessageBatch::*MessageBatchParseFromString)(const std::string& payload, + uint32_t batchSize) = &MessageBatch::parseFrom; class_<MessageBatch>("MessageBatch") - .def("with_message_id", &MessageBatch::withMessageId, return_self<>()) - .def("parse_from", MessageBatchParseFromString, return_self<>()) - .def("messages", &MessageBatch::messages, return_value_policy<copy_const_reference>()) - ; + .def("with_message_id", &MessageBatch::withMessageId, return_self<>()) + .def("parse_from", MessageBatchParseFromString, return_self<>()) + .def("messages", &MessageBatch::messages, return_value_policy<copy_const_reference>()); - class_<std::vector<Message> >("Messages") - .def(vector_indexing_suite<std::vector<Message> >() ); + class_<std::vector<Message> >("Messages").def(vector_indexing_suite<std::vector<Message> >()); } diff --git a/pulsar-client-cpp/python/src/producer.cc b/pulsar-client-cpp/python/src/producer.cc index 343650f..345639e 100644 --- a/pulsar-client-cpp/python/src/producer.cc +++ b/pulsar-client-cpp/python/src/producer.cc @@ -25,11 +25,10 @@ extern boost::python::object MessageId_serialize(const MessageId& msgId); boost::python::object Producer_send(Producer& producer, const Message& message) { Result res; MessageId messageId; - Py_BEGIN_ALLOW_THREADS - res = producer.send(message, messageId); + Py_BEGIN_ALLOW_THREADS res = producer.send(message, messageId); Py_END_ALLOW_THREADS - CHECK_RESULT(res); + CHECK_RESULT(res); return MessageId_serialize(messageId); } @@ -54,57 +53,55 @@ void Producer_sendAsync(Producer& producer, const Message& message, py::object c PyObject* pyCallback = callback.ptr(); Py_XINCREF(pyCallback); - Py_BEGIN_ALLOW_THREADS - producer.sendAsync(message, std::bind(Producer_sendAsyncCallback, pyCallback, - std::placeholders::_1, std::placeholders::_2)); + Py_BEGIN_ALLOW_THREADS producer.sendAsync( + message, + std::bind(Producer_sendAsyncCallback, pyCallback, std::placeholders::_1, std::placeholders::_2)); Py_END_ALLOW_THREADS } void Producer_flush(Producer& producer) { Result res; - Py_BEGIN_ALLOW_THREADS - res = producer.flush(); + Py_BEGIN_ALLOW_THREADS res = producer.flush(); Py_END_ALLOW_THREADS - CHECK_RESULT(res); + CHECK_RESULT(res); } void Producer_close(Producer& producer) { Result res; - Py_BEGIN_ALLOW_THREADS - res = producer.close(); + Py_BEGIN_ALLOW_THREADS res = producer.close(); Py_END_ALLOW_THREADS - CHECK_RESULT(res); + CHECK_RESULT(res); } void export_producer() { using namespace boost::python; class_<Producer>("Producer", no_init) - .def("topic", &Producer::getTopic, "return the topic to which producer is publishing to", - return_value_policy<copy_const_reference>()) - .def("producer_name", &Producer::getProducerName, - "return the producer name which could have been assigned by the system or specified by the client", - return_value_policy<copy_const_reference>()) - .def("last_sequence_id", &Producer::getLastSequenceId) - .def("send", &Producer_send, - "Publish a message on the topic associated with this Producer.\n" - "\n" - "This method will block until the message will be accepted and persisted\n" - "by the broker. In case of errors, the client library will try to\n" - "automatically recover and use a different broker.\n" - "\n" - "If it wasn't possible to successfully publish the message within the sendTimeout,\n" - "an error will be returned.\n" - "\n" - "This method is equivalent to asyncSend() and wait until the callback is triggered.\n" - "\n" - "@param msg message to publish\n") - .def("send_async", &Producer_sendAsync) - .def("flush", &Producer_flush, - "Flush all the messages buffered in the client and wait until all messages have been\n" - "successfully persisted\n") - .def("close", &Producer_close) - ; + .def("topic", &Producer::getTopic, "return the topic to which producer is publishing to", + return_value_policy<copy_const_reference>()) + .def("producer_name", &Producer::getProducerName, + "return the producer name which could have been assigned by the system or specified by the " + "client", + return_value_policy<copy_const_reference>()) + .def("last_sequence_id", &Producer::getLastSequenceId) + .def("send", &Producer_send, + "Publish a message on the topic associated with this Producer.\n" + "\n" + "This method will block until the message will be accepted and persisted\n" + "by the broker. In case of errors, the client library will try to\n" + "automatically recover and use a different broker.\n" + "\n" + "If it wasn't possible to successfully publish the message within the sendTimeout,\n" + "an error will be returned.\n" + "\n" + "This method is equivalent to asyncSend() and wait until the callback is triggered.\n" + "\n" + "@param msg message to publish\n") + .def("send_async", &Producer_sendAsync) + .def("flush", &Producer_flush, + "Flush all the messages buffered in the client and wait until all messages have been\n" + "successfully persisted\n") + .def("close", &Producer_close); } diff --git a/pulsar-client-cpp/python/src/pulsar.cc b/pulsar-client-cpp/python/src/pulsar.cc index a46ce53..e591b73 100644 --- a/pulsar-client-cpp/python/src/pulsar.cc +++ b/pulsar-client-cpp/python/src/pulsar.cc @@ -32,7 +32,6 @@ void export_exceptions(); PyObject* get_exception_class(Result result); - static void translateException(const PulsarException& ex) { std::string err = "Pulsar error: "; err += strResult(ex._result); @@ -40,8 +39,7 @@ static void translateException(const PulsarException& ex) { PyErr_SetString(get_exception_class(ex._result), err.c_str()); } -BOOST_PYTHON_MODULE(_pulsar) -{ +BOOST_PYTHON_MODULE(_pulsar) { py::register_exception_translator<PulsarException>(translateException); // Initialize thread support so that we can grab the GIL mutex diff --git a/pulsar-client-cpp/python/src/reader.cc b/pulsar-client-cpp/python/src/reader.cc index fec65da..668fb94 100644 --- a/pulsar-client-cpp/python/src/reader.cc +++ b/pulsar-client-cpp/python/src/reader.cc @@ -24,12 +24,12 @@ Message Reader_readNext(Reader& reader) { while (true) { Py_BEGIN_ALLOW_THREADS - // Use 100ms timeout to periodically check whether the - // interpreter was interrupted - res = reader.readNext(msg, 100); + // Use 100ms timeout to periodically check whether the + // interpreter was interrupted + res = reader.readNext(msg, 100); Py_END_ALLOW_THREADS - if (res != ResultTimeout) { + if (res != ResultTimeout) { // In case of timeout we keep calling receive() to simulate a // blocking call until a message is available, while breaking // every once in a while to check the Python signal status @@ -49,62 +49,56 @@ Message Reader_readNext(Reader& reader) { Message Reader_readNextTimeout(Reader& reader, int timeoutMs) { Message msg; Result res; - Py_BEGIN_ALLOW_THREADS - res = reader.readNext(msg, timeoutMs); + Py_BEGIN_ALLOW_THREADS res = reader.readNext(msg, timeoutMs); Py_END_ALLOW_THREADS - CHECK_RESULT(res); + CHECK_RESULT(res); return msg; } bool Reader_hasMessageAvailable(Reader& reader) { bool available = false; Result res; - Py_BEGIN_ALLOW_THREADS - res = reader.hasMessageAvailable(available); + Py_BEGIN_ALLOW_THREADS res = reader.hasMessageAvailable(available); Py_END_ALLOW_THREADS - CHECK_RESULT(res); + CHECK_RESULT(res); return available; } void Reader_close(Reader& reader) { Result res; - Py_BEGIN_ALLOW_THREADS - res = reader.close(); + Py_BEGIN_ALLOW_THREADS res = reader.close(); Py_END_ALLOW_THREADS - CHECK_RESULT(res); + CHECK_RESULT(res); } void Reader_seek(Reader& reader, const MessageId& msgId) { Result res; - Py_BEGIN_ALLOW_THREADS - res = reader.seek(msgId); + Py_BEGIN_ALLOW_THREADS res = reader.seek(msgId); Py_END_ALLOW_THREADS - CHECK_RESULT(res); + CHECK_RESULT(res); } void Reader_seek_timestamp(Reader& reader, uint64_t timestamp) { Result res; - Py_BEGIN_ALLOW_THREADS - res = reader.seek(timestamp); + Py_BEGIN_ALLOW_THREADS res = reader.seek(timestamp); Py_END_ALLOW_THREADS - CHECK_RESULT(res); + CHECK_RESULT(res); } void export_reader() { using namespace boost::python; class_<Reader>("Reader", no_init) - .def("topic", &Reader::getTopic, return_value_policy<copy_const_reference>()) - .def("read_next", &Reader_readNext) - .def("read_next", &Reader_readNextTimeout) - .def("has_message_available", &Reader_hasMessageAvailable) - .def("close", &Reader_close) - .def("seek", &Reader_seek) - .def("seek", &Reader_seek_timestamp) - ; + .def("topic", &Reader::getTopic, return_value_policy<copy_const_reference>()) + .def("read_next", &Reader_readNext) + .def("read_next", &Reader_readNextTimeout) + .def("has_message_available", &Reader_hasMessageAvailable) + .def("close", &Reader_close) + .def("seek", &Reader_seek) + .def("seek", &Reader_seek_timestamp); } diff --git a/pulsar-client-cpp/python/src/schema.cc b/pulsar-client-cpp/python/src/schema.cc index 397ec65..cdfcda6 100644 --- a/pulsar-client-cpp/python/src/schema.cc +++ b/pulsar-client-cpp/python/src/schema.cc @@ -21,10 +21,8 @@ void export_schema() { using namespace boost::python; - class_<SchemaInfo>("SchemaInfo", - init<SchemaType, const std::string& , const std::string&>()) - .def("schema_type", &SchemaInfo::getSchemaType) - .def("name", &SchemaInfo::getName, return_value_policy<copy_const_reference>()) - .def("schema", &SchemaInfo::getSchema, return_value_policy<copy_const_reference>()) - ; + class_<SchemaInfo>("SchemaInfo", init<SchemaType, const std::string&, const std::string&>()) + .def("schema_type", &SchemaInfo::getSchemaType) + .def("name", &SchemaInfo::getName, return_value_policy<copy_const_reference>()) + .def("schema", &SchemaInfo::getSchema, return_value_policy<copy_const_reference>()); } diff --git a/pulsar-client-cpp/python/src/utils.h b/pulsar-client-cpp/python/src/utils.h index 457d1f8..5be4473 100644 --- a/pulsar-client-cpp/python/src/utils.h +++ b/pulsar-client-cpp/python/src/utils.h @@ -27,8 +27,7 @@ namespace py = boost::python; struct PulsarException { Result _result; - PulsarException(Result res) : - _result(res) {} + PulsarException(Result res) : _result(res) {} }; inline void CHECK_RESULT(Result res) {
