This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/master by this push:
new a2ebfa5 [TUBEMQ-410] install python package and simplify
test_consumer.py (#315)
a2ebfa5 is described below
commit a2ebfa5046f26a35abd26d124cddb91c0b9f727b
Author: chao huang <[email protected]>
AuthorDate: Mon Nov 9 23:55:00 2020 -0800
[TUBEMQ-410] install python package and simplify test_consumer.py (#315)
---
tubemq-client-twins/tubemq-client-python/README.md | 30 ++++++-
tubemq-client-twins/tubemq-client-python/setup.py | 7 +-
.../src/python/example/README.md | 5 --
.../src/python/example/consumer/test_consumer.py | 93 ----------------------
.../src/python/example/test_consumer.py | 44 ++++++++++
.../src/python/tubemq/__init__.py | 20 +++++
.../{example/consumer => tubemq}/client.conf | 0
.../src/python/tubemq/client.py | 89 +++++++++++++++++++++
8 files changed, 187 insertions(+), 101 deletions(-)
diff --git a/tubemq-client-twins/tubemq-client-python/README.md
b/tubemq-client-twins/tubemq-client-python/README.md
index 3d9701a..1927e09 100644
--- a/tubemq-client-twins/tubemq-client-python/README.md
+++ b/tubemq-client-twins/tubemq-client-python/README.md
@@ -29,4 +29,32 @@ pip install ./
#### Examples
##### Producer example
##### Consumer example
-Consumer example is
[available](https://github.com/apache/incubator-tubemq/tree/tubemq-client-python/tubemq-client-twins/tubemq-client-python/src/python/example/consumer).
\ No newline at end of file
+
+The following example creates a TubeMQ consumer with a master IP address, a
group name, and a subscribed topic list. The consumer receives incoming
messages, prints the length of messages that arrive, and acknowledges each
message to the TubeMQ broker.
+```
+import time
+import tubemq
+
+topic_list = ['demo']
+master_addr = '127.0.0.1:8000'
+group_name = 'test_group'
+
+# Start consumer
+consumer = tubemq.consumer(master_addr, group_name, topic_list)
+
+# Test consumer
+start_time = time.time()
+while True:
+ msgs = consumer.receive()
+ if msgs:
+ print("GetMessage success, msssage count =", len(msgs))
+ consumer.acknowledge()
+
+ # used for test, consume 10 minutes only
+ stop_time = time.time()
+ if stop_time - start_time > 10 * 60:
+ break
+
+# Stop consumer
+consumer.stop()
+```
diff --git a/tubemq-client-twins/tubemq-client-python/setup.py
b/tubemq-client-twins/tubemq-client-python/setup.py
index 2866790..537660e 100644
--- a/tubemq-client-twins/tubemq-client-python/setup.py
+++ b/tubemq-client-twins/tubemq-client-python/setup.py
@@ -64,7 +64,7 @@ ext_modules = [
]
setup(
- name="tubemq-client",
+ name="tubemq",
version=__version__,
author="dockerzhang",
author_email="[email protected]",
@@ -75,4 +75,7 @@ setup(
extras_require={"test": "pytest"},
cmdclass={"build_ext": build_ext},
zip_safe=False,
-)
\ No newline at end of file
+ packages=['tubemq'],
+ package_dir={'tubemq': 'src/python/tubemq'},
+ package_data={'tubemq': ['client.conf']}
+)
diff --git
a/tubemq-client-twins/tubemq-client-python/src/python/example/README.md
b/tubemq-client-twins/tubemq-client-python/src/python/example/README.md
deleted file mode 100644
index f9f82d9..0000000
--- a/tubemq-client-twins/tubemq-client-python/src/python/example/README.md
+++ /dev/null
@@ -1,5 +0,0 @@
-### TubeMQ Python Client Example
-- Consumer example
-
-Consume example reference to [C++
test_consumer](https://github.com/apache/incubator-tubemq/tree/master/tubemq-client-twins/tubemq-client-cpp/example/consumer),
-and all methods exposed based on
[pybind11](https://pybind11.readthedocs.io/en/stable/basics.html).
\ No newline at end of file
diff --git
a/tubemq-client-twins/tubemq-client-python/src/python/example/consumer/test_consumer.py
b/tubemq-client-twins/tubemq-client-python/src/python/example/consumer/test_consumer.py
deleted file mode 100644
index 2789f71..0000000
---
a/tubemq-client-twins/tubemq-client-python/src/python/example/consumer/test_consumer.py
+++ /dev/null
@@ -1,93 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-from __future__ import print_function
-
-import tubemq_client
-import tubemq_config
-import tubemq_errcode
-import tubemq_return
-import tubemq_message
-import time
-
-topic_list = set(['demo'])
-master_addr = '127.0.0.1:8000'
-group_name = 'test_group'
-conf_file = './client.conf'
-err_info = ''
-result = False
-
-py_consumer = tubemq_client.TubeMQConsumer()
-consumer_config = tubemq_config.ConsumerConfig()
-consumer_config.setRpcReadTimeoutMs(20000)
-
-# set master addr
-result = consumer_config.setMasterAddrInfo(err_info, master_addr)
-if not result:
- print("Set Master AddrInfo failure:", err_info)
- exit(1)
-
-# set master addr
-result = consumer_config.setGroupConsumeTarget(err_info, group_name,
topic_list)
-if not result:
- print("Set GroupConsume Target failure:", err_info)
- exit(1)
-
-# StartTubeMQService
-result = tubemq_client.startTubeMQService(err_info, conf_file)
-if not result:
- print("StartTubeMQService failure:", err_info)
- exit(1)
-
-result = py_consumer.start(err_info, consumer_config)
-if not result:
- print("Initial consumer failure, error is:", err_info)
- exit(1)
-
-gentRet = tubemq_return.ConsumerResult()
-confirm_result = tubemq_return.ConsumerResult()
-start_time = time.time()
-while True:
- result = py_consumer.getMessage(gentRet)
- if result:
- msgs = gentRet.getMessageList()
- print("GetMessage success, msssage count =", len(msgs))
- result = py_consumer.confirm(gentRet.getConfirmContext(), True,
confirm_result)
- else:
- # 2.2.1 if failure, check error code
- # print error message if errcode not in
- # [no partitions assigned, all partitions in use,
- # or all partitons idle, reach max position]
- if not gentRet.getErrCode() == tubemq_errcode.Result.kErrNotFound \
- or not gentRet.getErrCode() ==
tubemq_errcode.Result.kErrNoPartAssigned \
- or not gentRet.getErrCode() ==
tubemq_errcode.Result.kErrAllPartInUse \
- or not gentRet.getErrCode() ==
tubemq_errcode.Result.kErrAllPartWaiting:
- print('GetMessage failure, err_code=%d, err_msg is:%s',
gentRet.getErrCode(), gentRet.getErrMessage())
-
- # used for test, consume 10 minutes only
- stop_time = time.time()
- if stop_time - start_time > 10 * 60:
- break;
-
-# stop and shutdown
-result = py_consumer.shutDown()
-result = tubemq_client.stopTubeMQService(err_info)
-if not result:
- print("StopTubeMQService failure, reason is:" + err_info)
- exit(1)
diff --git
a/tubemq-client-twins/tubemq-client-python/src/python/example/test_consumer.py
b/tubemq-client-twins/tubemq-client-python/src/python/example/test_consumer.py
new file mode 100644
index 0000000..a08edbf
--- /dev/null
+++
b/tubemq-client-twins/tubemq-client-python/src/python/example/test_consumer.py
@@ -0,0 +1,44 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import time
+import tubemq
+
+topic_list = ['demo']
+master_addr = '127.0.0.1:8000'
+group_name = 'test_group'
+
+# Start consumer
+consumer = tubemq.consumer(master_addr, group_name, topic_list)
+
+# Test consumer
+start_time = time.time()
+while True:
+ msgs = consumer.receive()
+ if msgs:
+ print("GetMessage success, msssage count =", len(msgs))
+ consumer.acknowledge()
+
+ # used for test, consume 10 minutes only
+ stop_time = time.time()
+ if stop_time - start_time > 10 * 60:
+ break
+
+# Stop consumer
+consumer.stop()
diff --git
a/tubemq-client-twins/tubemq-client-python/src/python/tubemq/__init__.py
b/tubemq-client-twins/tubemq-client-python/src/python/tubemq/__init__.py
new file mode 100644
index 0000000..b22a6dc
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-python/src/python/tubemq/__init__.py
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from .client import consumer
diff --git
a/tubemq-client-twins/tubemq-client-python/src/python/example/consumer/client.conf
b/tubemq-client-twins/tubemq-client-python/src/python/tubemq/client.conf
similarity index 100%
rename from
tubemq-client-twins/tubemq-client-python/src/python/example/consumer/client.conf
rename to tubemq-client-twins/tubemq-client-python/src/python/tubemq/client.conf
diff --git
a/tubemq-client-twins/tubemq-client-python/src/python/tubemq/client.py
b/tubemq-client-twins/tubemq-client-python/src/python/tubemq/client.py
new file mode 100644
index 0000000..f29f088
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-python/src/python/tubemq/client.py
@@ -0,0 +1,89 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from __future__ import print_function
+import os
+import tubemq_client
+import tubemq_config
+import tubemq_errcode
+import tubemq_return
+import tubemq_message
+
+class consumer(tubemq_client.TubeMQConsumer):
+ def __init__(self,
+ master_addr,
+ group_name,
+ topic_list,
+ RpcReadTimeoutMs=20000,
+
conf_file=os.path.join(os.path.dirname(__file__),'client.conf')):
+
+ super(consumer, self).__init__()
+
+ consumer_config = tubemq_config.ConsumerConfig()
+ consumer_config.setRpcReadTimeoutMs(RpcReadTimeoutMs)
+
+ err_info = ''
+ result = consumer_config.setMasterAddrInfo(err_info, master_addr)
+ if not result:
+ print("Set Master AddrInfo failure:", err_info)
+ exit(1)
+
+ result = consumer_config.setGroupConsumeTarget(err_info, group_name,
set(topic_list))
+ if not result:
+ print("Set GroupConsume Target failure:", err_info)
+ exit(1)
+
+ result = tubemq_client.startTubeMQService(err_info, conf_file)
+ if not result:
+ print("StartTubeMQService failure:", err_info)
+ exit(1)
+
+ result = self.start(err_info, consumer_config)
+ if not result:
+ print("Initial consumer failure, error is:", err_info)
+ exit(1)
+
+ self.getRet = tubemq_return.ConsumerResult()
+ self.confirm_result = tubemq_return.ConsumerResult()
+
+ def receive(self):
+ result = self.getMessage(self.getRet)
+ if result:
+ return self.getRet.getMessageList()
+ else:
+ # 2.2.1 if failure, check error code
+ # print error message if errcode not in
+ # [no partitions assigned, all partitions in use,
+ # or all partitons idle, reach max position]
+ if not self.getRet.getErrCode() ==
tubemq_errcode.Result.kErrNotFound \
+ or not self.getRet.getErrCode() ==
tubemq_errcode.Result.kErrNoPartAssigned \
+ or not self.getRet.getErrCode() ==
tubemq_errcode.Result.kErrAllPartInUse \
+ or not self.getRet.getErrCode() ==
tubemq_errcode.Result.kErrAllPartWaiting:
+ print('GetMessage failure, err_code=%d, err_msg is:%s',
self.getRet.getErrCode(), self.getRet.getErrMessage())
+
+ def acknowledge(self):
+ self.confirm(self.getRet.getConfirmContext(), True,
self.confirm_result)
+
+ def stop(self):
+ err_info = ''
+ result = self.shutDown()
+ result = tubemq_client.stopTubeMQService(err_info)
+ if not result:
+ print("StopTubeMQService failure, reason is:" + err_info)
+ exit(1)