This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/master by this push:
     new a2ebfa5  [TUBEMQ-410] install python package and simplify 
test_consumer.py (#315)
a2ebfa5 is described below

commit a2ebfa5046f26a35abd26d124cddb91c0b9f727b
Author: chao huang <[email protected]>
AuthorDate: Mon Nov 9 23:55:00 2020 -0800

    [TUBEMQ-410] install python package and simplify test_consumer.py (#315)
---
 tubemq-client-twins/tubemq-client-python/README.md | 30 ++++++-
 tubemq-client-twins/tubemq-client-python/setup.py  |  7 +-
 .../src/python/example/README.md                   |  5 --
 .../src/python/example/consumer/test_consumer.py   | 93 ----------------------
 .../src/python/example/test_consumer.py            | 44 ++++++++++
 .../src/python/tubemq/__init__.py                  | 20 +++++
 .../{example/consumer => tubemq}/client.conf       |  0
 .../src/python/tubemq/client.py                    | 89 +++++++++++++++++++++
 8 files changed, 187 insertions(+), 101 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-python/README.md 
b/tubemq-client-twins/tubemq-client-python/README.md
index 3d9701a..1927e09 100644
--- a/tubemq-client-twins/tubemq-client-python/README.md
+++ b/tubemq-client-twins/tubemq-client-python/README.md
@@ -29,4 +29,32 @@ pip install ./
 #### Examples
 ##### Producer example
 ##### Consumer example
-Consumer example is 
[available](https://github.com/apache/incubator-tubemq/tree/tubemq-client-python/tubemq-client-twins/tubemq-client-python/src/python/example/consumer).
\ No newline at end of file
+
+The following example creates a TubeMQ consumer with a master IP address, a 
group name, and a subscribed topic list. The consumer receives incoming 
messages, prints the length of messages that arrive, and acknowledges each 
message to the TubeMQ broker.
+```
+import time
+import tubemq
+
+topic_list = ['demo']
+master_addr = '127.0.0.1:8000'
+group_name = 'test_group'
+
+# Start consumer
+consumer = tubemq.consumer(master_addr, group_name, topic_list)
+
+# Test consumer
+start_time = time.time()
+while True:
+    msgs = consumer.receive()
+    if msgs:
+        print("GetMessage success, msssage count =", len(msgs))
+        consumer.acknowledge()
+
+    # used for test, consume 10 minutes only
+    stop_time = time.time()
+    if stop_time - start_time > 10 * 60:
+        break
+
+# Stop consumer
+consumer.stop()
+```
diff --git a/tubemq-client-twins/tubemq-client-python/setup.py 
b/tubemq-client-twins/tubemq-client-python/setup.py
index 2866790..537660e 100644
--- a/tubemq-client-twins/tubemq-client-python/setup.py
+++ b/tubemq-client-twins/tubemq-client-python/setup.py
@@ -64,7 +64,7 @@ ext_modules = [
 ]
 
 setup(
-    name="tubemq-client",
+    name="tubemq",
     version=__version__,
     author="dockerzhang",
     author_email="[email protected]",
@@ -75,4 +75,7 @@ setup(
     extras_require={"test": "pytest"},
     cmdclass={"build_ext": build_ext},
     zip_safe=False,
-)
\ No newline at end of file
+    packages=['tubemq'],
+    package_dir={'tubemq': 'src/python/tubemq'},
+    package_data={'tubemq': ['client.conf']}
+)
diff --git 
a/tubemq-client-twins/tubemq-client-python/src/python/example/README.md 
b/tubemq-client-twins/tubemq-client-python/src/python/example/README.md
deleted file mode 100644
index f9f82d9..0000000
--- a/tubemq-client-twins/tubemq-client-python/src/python/example/README.md
+++ /dev/null
@@ -1,5 +0,0 @@
-### TubeMQ Python Client Example
-- Consumer example
-
-Consume example reference to [C++ 
test_consumer](https://github.com/apache/incubator-tubemq/tree/master/tubemq-client-twins/tubemq-client-cpp/example/consumer),
 
-and all methods exposed based on 
[pybind11](https://pybind11.readthedocs.io/en/stable/basics.html).
\ No newline at end of file
diff --git 
a/tubemq-client-twins/tubemq-client-python/src/python/example/consumer/test_consumer.py
 
b/tubemq-client-twins/tubemq-client-python/src/python/example/consumer/test_consumer.py
deleted file mode 100644
index 2789f71..0000000
--- 
a/tubemq-client-twins/tubemq-client-python/src/python/example/consumer/test_consumer.py
+++ /dev/null
@@ -1,93 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-from __future__ import print_function
-
-import tubemq_client
-import tubemq_config
-import tubemq_errcode
-import tubemq_return
-import tubemq_message
-import time
-
-topic_list = set(['demo'])
-master_addr = '127.0.0.1:8000'
-group_name = 'test_group'
-conf_file = './client.conf'
-err_info = ''
-result = False
-
-py_consumer = tubemq_client.TubeMQConsumer()
-consumer_config = tubemq_config.ConsumerConfig()
-consumer_config.setRpcReadTimeoutMs(20000)
-
-# set master addr
-result = consumer_config.setMasterAddrInfo(err_info, master_addr)
-if not result:
-    print("Set Master AddrInfo failure:", err_info)
-    exit(1)
-
-# set master addr
-result = consumer_config.setGroupConsumeTarget(err_info, group_name, 
topic_list)
-if not result:
-    print("Set GroupConsume Target failure:", err_info)
-    exit(1)
-
-# StartTubeMQService
-result = tubemq_client.startTubeMQService(err_info, conf_file)
-if not result:
-    print("StartTubeMQService failure:", err_info)
-    exit(1)
-
-result = py_consumer.start(err_info, consumer_config)
-if not result:
-    print("Initial consumer failure, error is:", err_info)
-    exit(1)
-
-gentRet = tubemq_return.ConsumerResult()
-confirm_result = tubemq_return.ConsumerResult()
-start_time = time.time()
-while True:
-    result = py_consumer.getMessage(gentRet)
-    if result:
-        msgs = gentRet.getMessageList()
-        print("GetMessage success, msssage count =", len(msgs))
-        result = py_consumer.confirm(gentRet.getConfirmContext(), True, 
confirm_result)
-    else:
-        # 2.2.1 if failure, check error code
-        # print error message if errcode not in
-        # [no partitions assigned, all partitions in use,
-        #    or all partitons idle, reach max position]
-        if not gentRet.getErrCode() == tubemq_errcode.Result.kErrNotFound \
-                or not gentRet.getErrCode() == 
tubemq_errcode.Result.kErrNoPartAssigned \
-                or not gentRet.getErrCode() == 
tubemq_errcode.Result.kErrAllPartInUse \
-                or not gentRet.getErrCode() == 
tubemq_errcode.Result.kErrAllPartWaiting:
-            print('GetMessage failure, err_code=%d, err_msg is:%s', 
gentRet.getErrCode(), gentRet.getErrMessage())
-
-    # used for test, consume 10 minutes only
-    stop_time = time.time()
-    if stop_time - start_time > 10 * 60:
-        break;
-
-# stop and shutdown
-result = py_consumer.shutDown()
-result = tubemq_client.stopTubeMQService(err_info)
-if not result:
-    print("StopTubeMQService failure, reason is:" + err_info)
-    exit(1)
diff --git 
a/tubemq-client-twins/tubemq-client-python/src/python/example/test_consumer.py 
b/tubemq-client-twins/tubemq-client-python/src/python/example/test_consumer.py
new file mode 100644
index 0000000..a08edbf
--- /dev/null
+++ 
b/tubemq-client-twins/tubemq-client-python/src/python/example/test_consumer.py
@@ -0,0 +1,44 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import time
+import tubemq
+
+topic_list = ['demo']
+master_addr = '127.0.0.1:8000'
+group_name = 'test_group'
+
+# Start consumer
+consumer = tubemq.consumer(master_addr, group_name, topic_list)
+
+# Test consumer
+start_time = time.time()
+while True:
+    msgs = consumer.receive()
+    if msgs:
+        print("GetMessage success, msssage count =", len(msgs))
+        consumer.acknowledge()
+
+    # used for test, consume 10 minutes only
+    stop_time = time.time()
+    if stop_time - start_time > 10 * 60:
+        break
+
+# Stop consumer
+consumer.stop()
diff --git 
a/tubemq-client-twins/tubemq-client-python/src/python/tubemq/__init__.py 
b/tubemq-client-twins/tubemq-client-python/src/python/tubemq/__init__.py
new file mode 100644
index 0000000..b22a6dc
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-python/src/python/tubemq/__init__.py
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from .client import consumer
diff --git 
a/tubemq-client-twins/tubemq-client-python/src/python/example/consumer/client.conf
 b/tubemq-client-twins/tubemq-client-python/src/python/tubemq/client.conf
similarity index 100%
rename from 
tubemq-client-twins/tubemq-client-python/src/python/example/consumer/client.conf
rename to tubemq-client-twins/tubemq-client-python/src/python/tubemq/client.conf
diff --git 
a/tubemq-client-twins/tubemq-client-python/src/python/tubemq/client.py 
b/tubemq-client-twins/tubemq-client-python/src/python/tubemq/client.py
new file mode 100644
index 0000000..f29f088
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-python/src/python/tubemq/client.py
@@ -0,0 +1,89 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from __future__ import print_function
+import os
+import tubemq_client
+import tubemq_config
+import tubemq_errcode
+import tubemq_return
+import tubemq_message
+
+class consumer(tubemq_client.TubeMQConsumer):
+    def __init__(self,
+                 master_addr,
+                 group_name,
+                 topic_list,
+                 RpcReadTimeoutMs=20000,
+                 
conf_file=os.path.join(os.path.dirname(__file__),'client.conf')):
+
+        super(consumer, self).__init__()
+
+        consumer_config = tubemq_config.ConsumerConfig()
+        consumer_config.setRpcReadTimeoutMs(RpcReadTimeoutMs)
+
+        err_info = ''
+        result = consumer_config.setMasterAddrInfo(err_info, master_addr)
+        if not result:
+            print("Set Master AddrInfo failure:", err_info)
+            exit(1)
+
+        result = consumer_config.setGroupConsumeTarget(err_info, group_name, 
set(topic_list))
+        if not result:
+            print("Set GroupConsume Target failure:", err_info)
+            exit(1)
+
+        result = tubemq_client.startTubeMQService(err_info, conf_file)
+        if not result:
+            print("StartTubeMQService failure:", err_info)
+            exit(1)
+
+        result = self.start(err_info, consumer_config)
+        if not result:
+            print("Initial consumer failure, error is:", err_info)
+            exit(1)
+
+        self.getRet = tubemq_return.ConsumerResult()
+        self.confirm_result = tubemq_return.ConsumerResult()
+
+    def receive(self):
+        result = self.getMessage(self.getRet)
+        if result:
+            return self.getRet.getMessageList()
+        else:
+            # 2.2.1 if failure, check error code
+            # print error message if errcode not in
+            # [no partitions assigned, all partitions in use,
+            #    or all partitons idle, reach max position]
+            if not self.getRet.getErrCode() == 
tubemq_errcode.Result.kErrNotFound \
+                    or not self.getRet.getErrCode() == 
tubemq_errcode.Result.kErrNoPartAssigned \
+                    or not self.getRet.getErrCode() == 
tubemq_errcode.Result.kErrAllPartInUse \
+                    or not self.getRet.getErrCode() == 
tubemq_errcode.Result.kErrAllPartWaiting:
+                print('GetMessage failure, err_code=%d, err_msg is:%s', 
self.getRet.getErrCode(), self.getRet.getErrMessage())
+
+    def acknowledge(self):
+        self.confirm(self.getRet.getConfirmContext(), True, 
self.confirm_result)
+
+    def stop(self):
+        err_info = ''
+        result = self.shutDown()
+        result = tubemq_client.stopTubeMQService(err_info)
+        if not result:
+            print("StopTubeMQService failure, reason is:" + err_info)
+            exit(1)

Reply via email to