This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 5e415bea3 [INLONG-4969][TubeMQ] Python SDK for Producing Message
(#6700)
5e415bea3 is described below
commit 5e415bea34ce4f3610b0c5dc5ca90e8ef846bf12
Author: Xue Huanran <[email protected]>
AuthorDate: Mon Dec 5 11:54:53 2022 +0800
[INLONG-4969][TubeMQ] Python SDK for Producing Message (#6700)
Co-authored-by: huanranxue <[email protected]>
---
.../tubemq-client-python/README.md | 37 +++++++-
.../tubemq-client-python/src/cpp/tubemq_client.cc | 11 +++
.../tubemq-client-python/src/cpp/tubemq_config.cc | 5 +
.../tubemq-client-python/src/cpp/tubemq_message.cc | 3 +-
.../src/python/example/test_producer.py | 102 +++++++++++++++++++++
.../src/python/tubemq/__init__.py | 1 +
.../src/python/tubemq/client.py | 60 ++++++++++++
7 files changed, 217 insertions(+), 2 deletions(-)
diff --git a/inlong-tubemq/tubemq-client-twins/tubemq-client-python/README.md
b/inlong-tubemq/tubemq-client-twins/tubemq-client-python/README.md
index 673fe4615..f0424eccb 100644
--- a/inlong-tubemq/tubemq-client-twins/tubemq-client-python/README.md
+++ b/inlong-tubemq/tubemq-client-twins/tubemq-client-python/README.md
@@ -28,10 +28,45 @@ pip install ./
#### Examples
##### Producer example
+
+This is a simple example for how to use Python TubeMQ producer, like Java/C++
producer, the `master_addr`, `topic_list` should be provided. A more detailed
example is
[`src/python/example/test_producer.py`](src/python/example/test_producer.py)
+
+```python
+import time
+import tubemq
+import tubemq_message
+
+topic_list = ['demo']
+MASTER_ADDR = "127.0.0.1:8000"
+
+# Start producer
+producer = tubemq.Producer(MASTER_ADDR)
+
+# publish the topic
+producer.publish(topic_list)
+
+# wait for the first heartbeath to master ready
+time.sleep(10)
+
+# Test Producer
+send_data = "hello_tubemq"
+while True:
+ msg = tubemq_message.Message(topic_list[0], send_data, len(send_data))
+ res = producer.send(msg, is_sync=True) # default is asynchronous mode,
convience for demo
+ if res:
+ print("Push successfully!!!")
+
+# Stop the producer
+producer.stop()
+
+```
+
+
+
##### Consumer example
The following example creates a TubeMQ consumer with a master IP address, a
group name, and a subscribed topic list. The consumer receives incoming
messages, prints the length of messages that arrive, and acknowledges each
message to the TubeMQ broker.
-```
+```python
import time
import tubemq
diff --git
a/inlong-tubemq/tubemq-client-twins/tubemq-client-python/src/cpp/tubemq_client.cc
b/inlong-tubemq/tubemq-client-twins/tubemq-client-python/src/cpp/tubemq_client.cc
index 97ba9e704..1e303c628 100644
---
a/inlong-tubemq/tubemq-client-twins/tubemq-client-python/src/cpp/tubemq_client.cc
+++
b/inlong-tubemq/tubemq-client-twins/tubemq-client-python/src/cpp/tubemq_client.cc
@@ -18,6 +18,8 @@
*/
#include <pybind11/pybind11.h>
+#include <pybind11/stl.h>
+#include <pybind11/functional.h>
#include "tubemq/tubemq_client.h"
namespace py = pybind11;
@@ -37,4 +39,13 @@ PYBIND11_MODULE(tubemq_client, m) {
.def("getMessage", &TubeMQConsumer::GetMessage)
.def("confirm", &TubeMQConsumer::Confirm)
.def("getCurConsumedInfo", &TubeMQConsumer::GetCurConsumedInfo);
+
+ py::class_<TubeMQProducer>(m, "TubeMQProducer")
+ .def(py::init<>())
+ .def("start", &TubeMQProducer::Start)
+ .def("shutDown", &TubeMQProducer::ShutDown)
+ .def("publishTopics", &TubeMQProducer::Publish)
+ .def("sendMessage", static_cast<bool (TubeMQProducer::*)(string&,
const Message&)>(&TubeMQProducer::SendMessage))
+ .def("sendMessage", static_cast<void (TubeMQProducer::*)(const
Message&,
+ const
std::function<void(const ErrorCode&)>&)>(&TubeMQProducer::SendMessage));
}
\ No newline at end of file
diff --git
a/inlong-tubemq/tubemq-client-twins/tubemq-client-python/src/cpp/tubemq_config.cc
b/inlong-tubemq/tubemq-client-twins/tubemq-client-python/src/cpp/tubemq_config.cc
index 3a1289162..a768e608a 100644
---
a/inlong-tubemq/tubemq-client-twins/tubemq-client-python/src/cpp/tubemq_config.cc
+++
b/inlong-tubemq/tubemq-client-twins/tubemq-client-python/src/cpp/tubemq_config.cc
@@ -29,6 +29,11 @@ using std::set;
using std::string;
PYBIND11_MODULE(tubemq_config, m) {
+ py::class_<ProducerConfig>(m, "ProducerConfig")
+ .def(py::init<>())
+ .def("setRpcReadTimeoutMs", &ProducerConfig::SetRpcReadTimeoutMs)
+ .def("setMasterAddrInfo", &ProducerConfig::SetMasterAddrInfo);
+
py::enum_<ConsumePosition>(m, "ConsumePosition")
.value("kConsumeFromFirstOffset",
ConsumePosition::kConsumeFromFirstOffset)
.value("kConsumeFromLatestOffset",
ConsumePosition::kConsumeFromLatestOffset)
diff --git
a/inlong-tubemq/tubemq-client-twins/tubemq-client-python/src/cpp/tubemq_message.cc
b/inlong-tubemq/tubemq-client-twins/tubemq-client-python/src/cpp/tubemq_message.cc
index 752c3a52e..fb56447ed 100644
---
a/inlong-tubemq/tubemq-client-twins/tubemq-client-python/src/cpp/tubemq_message.cc
+++
b/inlong-tubemq/tubemq-client-twins/tubemq-client-python/src/cpp/tubemq_message.cc
@@ -47,5 +47,6 @@ PYBIND11_MODULE(tubemq_message, m) {
.def("hasProperty", &Message::HasProperty)
.def("getProperty", &Message::GetProperty)
.def("getFilterItem", &Message::GetFilterItem)
- .def("addProperty", &Message::AddProperty);
+ .def("addProperty", &Message::AddProperty)
+ .def("putSystemHeader", &Message::PutSystemHeader);
}
\ No newline at end of file
diff --git
a/inlong-tubemq/tubemq-client-twins/tubemq-client-python/src/python/example/test_producer.py
b/inlong-tubemq/tubemq-client-twins/tubemq-client-python/src/python/example/test_producer.py
new file mode 100644
index 000000000..e15d78cbf
--- /dev/null
+++
b/inlong-tubemq/tubemq-client-twins/tubemq-client-python/src/python/example/test_producer.py
@@ -0,0 +1,102 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import tubemq
+import tubemq_message
+import tubemq_errcode
+import argparse
+import time
+import datetime
+
+from threading import Lock
+
+kTotalCounter = 0
+kSuccessCounter = 0
+kFailCounter = 0
+counter_lock = Lock()
+
+# Reference: java producer: MixedUtils.buildTestData, only for demo
+def build_test_data(msg_data_size):
+ transmit_data = "This is a test data!"
+ data = ""
+ while (len(data) + len(transmit_data)) <= msg_data_size:
+ data += transmit_data
+ if len(data) < msg_data_size:
+ data += transmit_data[:msg_data_size - len(data)]
+ return data
+
+def send_callback(error_code):
+ global counter_lock
+ global kTotalCounter
+ global kSuccessCounter
+ global kFailCounter
+ with counter_lock:
+ kTotalCounter += 1
+ if error_code == tubemq_errcode.kErrSuccess:
+ kSuccessCounter += 1
+ else:
+ kFailCounter += 1
+
+parser = argparse.ArgumentParser()
+parser.add_argument("--master_servers", type=str, required=True,
+ help="The master address(es) to connect to, the format is
master1_ip:port[,master2_ip:port]")
+parser.add_argument("--topics", type=str, required=True, help="The topic
names.")
+parser.add_argument("--conf_file", type=str,
default="/tubemq-python/src/python/tubemq/client.conf",
+ help="The path of configuration file.")
+parser.add_argument("--sync_produce", type=int, default=0, help="Whether
synchronous production.")
+parser.add_argument("--msg_count", type=int, default=10, help="The number of
messages to send.")
+parser.add_argument("--msg_data_size", type=int, default=1000, help="The
message size, (0, 1024 * 1024) bytes.")
+
+params = parser.parse_args()
+
+# start producer
+producer = tubemq.Producer(params.master_servers)
+producer.publish(params.topics)
+
+# wait for the first heartbeath to master ready
+time.sleep(10)
+
+send_data = build_test_data(params.msg_data_size)
+curr_time = datetime.datetime.now().strftime("%Y%m%d%H%M")
+# for test, only take the first topic
+first_topic = params.topics if isinstance(params.topics, str) else
params.topics.split(",")[0]
+
+t0 = time.time()
+for i in range(params.msg_count):
+ msg = tubemq_message.Message(first_topic, send_data, len(send_data))
+ msg.putSystemHeader(str(i), curr_time)
+ if params.sync_produce:
+ res = producer.send(msg, is_sync=True)
+ kTotalCounter += 1
+ if res:
+ kSuccessCounter += 1
+ else:
+ kFailCounter += 1
+ else:
+ producer.send(msg, callback=send_callback)
+
+while kTotalCounter < params.msg_count:
+ time.sleep(1e-6)
+
+t1 = time.time()
+print("Python producer send costs {} seconds.".format(t1 - t0))
+
+# Stop producer
+producer.stop()
+
\ No newline at end of file
diff --git
a/inlong-tubemq/tubemq-client-twins/tubemq-client-python/src/python/tubemq/__init__.py
b/inlong-tubemq/tubemq-client-twins/tubemq-client-python/src/python/tubemq/__init__.py
index 2fd51b525..711138d74 100644
---
a/inlong-tubemq/tubemq-client-twins/tubemq-client-python/src/python/tubemq/__init__.py
+++
b/inlong-tubemq/tubemq-client-twins/tubemq-client-python/src/python/tubemq/__init__.py
@@ -17,4 +17,5 @@
# under the License.
from .client import Consumer
+from .client import Producer
from .tube_msg import TubeMsg
diff --git
a/inlong-tubemq/tubemq-client-twins/tubemq-client-python/src/python/tubemq/client.py
b/inlong-tubemq/tubemq-client-twins/tubemq-client-python/src/python/tubemq/client.py
index a30cc2ecf..188774832 100644
---
a/inlong-tubemq/tubemq-client-twins/tubemq-client-python/src/python/tubemq/client.py
+++
b/inlong-tubemq/tubemq-client-twins/tubemq-client-python/src/python/tubemq/client.py
@@ -27,6 +27,66 @@ import tubemq_tdmsg # pylint: disable=unused-import
import tubemq_message # pylint: disable=unused-import
+class Producer(tubemq_client.TubeMQProducer):
+ def __init__(self,
+ master_addr,
+ rpc_read_timeout_ms=20000,
+ conf_file=os.path.join(os.path.dirname(__file__),
"client.conf")):
+ super(Producer, self).__init__()
+
+ producer_config = tubemq_config.ProducerConfig()
+ producer_config.setRpcReadTimeoutMs(rpc_read_timeout_ms)
+
+ err_info = ""
+ result = producer_config.setMasterAddrInfo(err_info, master_addr)
+ if not result:
+ print("Set Master AddrInfo failure:", err_info)
+ exit(1)
+
+ result = tubemq_client.startTubeMQService(err_info, conf_file)
+ if not result:
+ print("StartTubeMQService failure:", err_info)
+ exit(1)
+
+ result = self.start(err_info, producer_config)
+ if not result:
+ print("Initial producer failure, error is:", err_info)
+ exit(1)
+
+ def publish(self, topic_list):
+ if not isinstance(topic_list, (tuple, list, set, str)):
+ raise TypeError("Accepted types: `list`, `tuple`, `set` or `str`,
get {}".format(type(topic_list)))
+ if isinstance(topic_list, (tuple, list)):
+ topic_list = set(topic_list)
+ elif isinstance(topic_list, str):
+ topic_list = {topic_list}
+
+ err_info = ""
+ result = self.publishTopics(err_info, topic_list)
+ if not result:
+ print("Python Producer push topics failed, error is:", err_info)
+ exit(1)
+
+ def send(self, msg, is_sync=False, callback=None):
+ if is_sync:
+ err_info = ""
+ result = self.sendMessage(err_info, msg)
+ if not result:
+ print("Send Message failure, error is:", err_info)
+ return result
+ else:
+ if callback is None:
+ raise ValueError("The callback function should be provided
when sending message async.")
+ self.sendMessage(msg, callback)
+
+ def stop(self):
+ err_info = ''
+ result = self.shutDown()
+ result = tubemq_client.stopTubeMQService(err_info)
+ if not result:
+ print("StopTubeMQService failure, reason is:" + err_info)
+ exit(1)
+
class Consumer(tubemq_client.TubeMQConsumer):
def __init__(self,
master_addr,