This is an automated email from the ASF dual-hosted git repository.
yuzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-python.git
The following commit(s) were added to refs/heads/master by this push:
new 143af50 Support ssl. (#157)
143af50 is described below
commit 143af507040d30173908b96b81f338e34e1f81e9
Author: hexueyuan <[email protected]>
AuthorDate: Thu Dec 18 14:48:36 2025 +0800
Support ssl. (#157)
* Support ssl.
* Added compatibility code for the SSL interface.
---------
Co-authored-by: hexueyuan <[email protected]>
---
rocketmq/client.py | 14 ++++++++++++++
rocketmq/ffi.py | 14 ++++++++++++++
samples/consumer.py | 4 +++-
samples/producer.py | 17 +++++++++++++++++
4 files changed, 48 insertions(+), 1 deletion(-)
diff --git a/rocketmq/client.py b/rocketmq/client.py
index fecac9c..bb8e9d4 100644
--- a/rocketmq/client.py
+++ b/rocketmq/client.py
@@ -262,6 +262,13 @@ class Producer(object):
def set_message_trace(self, message_trace):
ffi_check(dll.SetProducerMessageTrace(self._handle, message_trace and
TraceModel.OPEN or TraceModel.CLOSE))
+ def set_ssl_enable(self, enable):
+ ssl_enable_code = 1 if enable else 0
+ ffi_check(dll.SetProducerSsl(self._handle, ssl_enable_code))
+
+ def set_ssl_property_file(self, file_path):
+ ffi_check(dll.SetProducerSslPropertyFile(self._handle,
_to_bytes(file_path)))
+
def start(self):
ffi_check(dll.StartProducer(self._handle))
@@ -401,6 +408,13 @@ class PushConsumer(object):
_to_bytes(channel)
))
+ def set_ssl_enable(self, enable):
+ ssl_enable_code = 1 if enable else 0
+ ffi_check(dll.SetPushConsumerSsl(self._handle, ssl_enable_code))
+
+ def set_ssl_property_file(self, file_path):
+ ffi_check(dll.SetPushConsumerSslPropertyFile(self._handle,
_to_bytes(file_path)))
+
def subscribe(self, topic, callback, expression='*'):
def _on_message(consumer, msg):
exc = None
diff --git a/rocketmq/ffi.py b/rocketmq/ffi.py
index b4f6a11..0eca2ca 100644
--- a/rocketmq/ffi.py
+++ b/rocketmq/ffi.py
@@ -207,6 +207,13 @@ dll.SetProducerMaxMessageSize.argtypes = [c_void_p, c_int]
dll.SetProducerMaxMessageSize.restype = _CStatus
dll.SetProducerMessageTrace.argtypes = [c_void_p, TraceModel]
dll.SetProducerMessageTrace.restype = _CStatus
+try:
+ dll.SetProducerSsl.argtypes = [c_void_p, c_int]
+ dll.SetProducerSsl.restype = _CStatus
+ dll.SetProducerSslPropertyFile.argtypes = [c_void_p, c_char_p]
+ dll.SetProducerSslPropertyFile.restype = _CStatus
+except AttributeError:
+ pass
dll.SendMessageSync.argtypes = [c_void_p, c_void_p, POINTER(_CSendResult)]
dll.SendMessageSync.restype = _CStatus
dll.SendMessageOneway.argtypes = [c_void_p, c_void_p]
@@ -271,6 +278,13 @@ dll.SetPushConsumerMessageModel.restype = _CStatus
dll.SetPushConsumerLogLevel.restype = _CStatus
dll.SetPushConsumerMessageTrace.argtypes = [c_void_p, TraceModel]
dll.SetPushConsumerMessageTrace.restype = _CStatus
+try:
+ dll.SetPushConsumerSsl.argtypes = [c_void_p, c_int]
+ dll.SetPushConsumerSsl.restype = _CStatus
+ dll.SetPushConsumerSslPropertyFile.argtypes = [c_void_p, c_char_p]
+ dll.SetPushConsumerSslPropertyFile.restype = _CStatus
+except AttributeError:
+ pass
# Misc
dll.GetLatestErrorMessage.argtypes = []
diff --git a/samples/consumer.py b/samples/consumer.py
index b95da79..bf36c4a 100644
--- a/samples/consumer.py
+++ b/samples/consumer.py
@@ -27,7 +27,9 @@ def callback(msg):
def start_consume_message():
consumer = PushConsumer('consumer_group')
consumer.set_name_server_address('127.0.0.1:9876')
- consumer.subscribe('TopicTest', callback)
+ consumer.subscribe('BenchmarkTest', callback)
+ # consumer.set_ssl_enable(True)
+ # consumer.set_ssl_property_file("/etc/rocketmq/tls.properties")
print ('start consume message')
consumer.start()
diff --git a/samples/producer.py b/samples/producer.py
index f69534c..4869b77 100644
--- a/samples/producer.py
+++ b/samples/producer.py
@@ -115,6 +115,23 @@ def send_transaction_message(count):
time.sleep(3600)
+def send_message_with_ssl(count):
+ producer = Producer(gid)
+ producer.set_name_server_address(name_srv)
+ producer.set_ssl_enable(True)
+ producer.set_ssl_property_file("/etc/rocketmq/tls.properties")
+ producer.start()
+ for n in range(count):
+ msg = create_message()
+ producer.start()
+ for n in range(count):
+ msg = create_message()
+ ret = producer.send_sync(msg)
+ print ('send message status: ' + str(ret.status) + ' msgId: ' +
ret.msg_id)
+ print ('send sync message done')
+ producer.shutdown()
+
+
if __name__ == '__main__':
send_message_sync(10)