vongosling closed pull request #10: Add send one way interface and add test code
URL: https://github.com/apache/rocketmq-client-python/pull/10
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sample/testProducer.py b/sample/testProducer.py
index 558eb6b..6938fd2 100644
--- a/sample/testProducer.py
+++ b/sample/testProducer.py
@@ -42,6 +42,16 @@ def testSendMssage(producer,topic,key,body):
     DestroyMessage(msg)
     print("Done...............")
 
+def testSendMessageOneway(producer, topic, key, body):
+    print("Starting Sending(Oneway).....")
+    msg = CreateMessage(topic)
+    SetMessageBody(msg, body)
+    SetMessageKeys(msg, key)
+    SetMessageTags(msg, "Send Message Oneway Test.")
+    SendMessageOneway(producer,msg)
+    DestroyMessage(msg)
+    print("Done...............")
+
 def releaseProducer(producer):
     ShutdownProducer(producer)
     DestroyProducer(producer)
@@ -59,4 +69,9 @@ def releaseProducer(producer):
     
     print("Now Send Message:",i)
 
+while i < 10:
+    i += 1
+    testSendMessageOneway(producer, topic, key, body)
+    print("Now Send Message One way:",i)
+
 releaseProducer(producer)
diff --git a/src/PythonWrapper.cpp b/src/PythonWrapper.cpp
index b6c04b6..75b5167 100644
--- a/src/PythonWrapper.cpp
+++ b/src/PythonWrapper.cpp
@@ -133,6 +133,10 @@ PySendResult PySendMessageSync(void *producer, void *msg) {
     return ret;
 }
 
+int PySendMessageOneway(void *producer, void *msg) {
+    return SendMessageOneway((CProducer *) producer, (CMessage *) msg);
+}
+
 //SendResult
 const char *PyGetSendResultMsgID(CSendResult &sendResult) {
     return (const char *) (sendResult.msgId);
@@ -263,6 +267,7 @@ BOOST_PYTHON_MODULE (librocketmqclientpython) {
     def("SetProducerInstanceName", PySetProducerInstanceName);
     def("SetProducerSessionCredentials", PySetProducerSessionCredentials);
     def("SendMessageSync", PySendMessageSync);
+    def("SendMessageOneway", PySendMessageOneway);
 
     //For Consumer
     def("CreatePushConsumer", PyCreatePushConsumer, 
return_value_policy<return_opaque_pointer>());
diff --git a/src/PythonWrapper.h b/src/PythonWrapper.h
index 887e6f7..04b3164 100644
--- a/src/PythonWrapper.h
+++ b/src/PythonWrapper.h
@@ -74,6 +74,7 @@ int PySetProducerNameServerAddress(void *producer, const char 
*namesrv);
 int PySetProducerInstanceName(void *producer, const char *instanceName);
 int PySetProducerSessionCredentials(void *producer, const char *accessKey, 
const char *secretKey, const char *channel);
 PySendResult PySendMessageSync(void *producer, void *msg);
+int PySendMessageOneway(void *producer, void *msg);
 
 //sendResult
 const char *PyGetSendResultMsgID(CSendResult &sendResult);
diff --git a/test/TestConsumeMessages.py b/test/TestConsumeMessages.py
new file mode 100644
index 0000000..6b9f6d2
--- /dev/null
+++ b/test/TestConsumeMessages.py
@@ -0,0 +1,78 @@
+# /*
+# * Licensed to the Apache Software Foundation (ASF) under one or more
+# * contributor license agreements.  See the NOTICE file distributed with
+# * this work for additional information regarding copyright ownership.
+# * The ASF licenses this file to You under the Apache License, Version 2.0
+# * (the "License"); you may not use this file except in compliance with
+# * the License.  You may obtain a copy of the License at
+# *
+# *     http://www.apache.org/licenses/LICENSE-2.0
+# *
+# * Unless required by applicable law or agreed to in writing, software
+# * distributed under the License is distributed on an "AS IS" BASIS,
+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# * See the License for the specific language governing permissions and
+# * limitations under the License.
+# */
+
+import __init__
+from librocketmqclientpython import *
+
+import time
+import sys
+
+topic = 'test'
+name_srv = '127.0.0.1:9876'
+tag = 'rmq-tag'
+consumer_group = 'test-consumer-group'
+totalMsg = 0
+
+
+def sigint_handler(signum, frame):
+    global is_sigint_up
+    is_sigint_up = True
+    sys.exit(0)
+
+
+def consumer_message(msg):
+    global totalMsg
+    totalMsg += 1
+    print 'total count %d' % totalMsg
+    print 'topic=%s' % GetMessageTopic(msg)
+    print 'tag=%s' % GetMessageTags(msg)
+    print 'body=%s' % GetMessageBody(msg)
+    print 'msg id=%s' % GetMessageId(msg)
+
+    print 'map.keys %s' % GetMessageKeys(msg)
+
+    print 'map.name %s' % GetMessageProperty(msg, 'name')
+    print 'map.id %s' % GetMessageProperty(msg, 'id')
+    return 0
+
+
+def init_producer(_group, _topic, _tag):
+    consumer = CreatePushConsumer(_group)
+    SetPushConsumerNameServerAddress(consumer, name_srv)
+    SetPushConsumerThreadCount(consumer, 1)
+    Subscribe(consumer, _topic, _tag)
+    RegisterMessageCallback(consumer, consumerMessage)
+    StartPushConsumer(consumer)
+    print 'consumer is ready...'
+    return consumer
+
+
+def start_one_consumer(_group, _topic, _tag):
+    consumer = init_producer(_group, _topic, _tag)
+    i = 1
+    while i <= 10:
+        print 'clock: ' + str(i)
+        i += 1
+        time.sleep(10)
+
+    ShutdownPushConsumer(consumer)
+    DestroyPushConsumer(consumer)
+    print("Consumer Down....")
+
+
+if __name__ == '__main__':
+    start_one_consumer(consumer_group, topic, '*')
diff --git a/test/TestSendMessages.py b/test/TestSendMessages.py
new file mode 100644
index 0000000..69871a2
--- /dev/null
+++ b/test/TestSendMessages.py
@@ -0,0 +1,206 @@
+# /*
+# * Licensed to the Apache Software Foundation (ASF) under one or more
+# * contributor license agreements.  See the NOTICE file distributed with
+# * this work for additional information regarding copyright ownership.
+# * The ASF licenses this file to You under the Apache License, Version 2.0
+# * (the "License"); you may not use this file except in compliance with
+# * the License.  You may obtain a copy of the License at
+# *
+# *     http://www.apache.org/licenses/LICENSE-2.0
+# *
+# * Unless required by applicable law or agreed to in writing, software
+# * distributed under the License is distributed on an "AS IS" BASIS,
+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# * See the License for the specific language governing permissions and
+# * limitations under the License.
+# */
+
+import __init__
+from librocketmqclientpython import *
+import time
+
+topic = 'test'
+name_srv = '127.0.0.1:9876'
+
+
+def init_producer():
+    producer = CreateProducer('TestProducer')
+    SetProducerNameServerAddress(producer, name_srv)
+    StartProducer(producer)
+    return producer
+
+
+producer = init_producer()
+tag = 'rmq-tag'
+key = 'rmq-key'
+
+
+def send_messages_sync(count):
+    for a in range(count):
+        print 'start sending...'
+        body = 'hi rmq, now is ' + \
+            time.strftime('%Y.%m.%d', time.localtime(time.time()))
+        msg = CreateMessage(topic)
+        SetMessageBody(msg, body)
+        result = SendMessageSync(producer, msg)
+        DestroyMessage(msg)
+        print '[RMQ-PRODUCER]start sending...done, msg id = ' + \
+            result.GetMsgId()
+
+
+def send_messages_sync_with_map(count):
+    print 'sending message with properties...id, name'
+    for a in range(count):
+        body = 'hi rmq, now is ' + \
+            time.strftime('%Y.%m.%d', time.localtime(time.time()))
+        msg = CreateMessage(topic)
+        SetMessageBody(msg, body)
+
+        SetMessageProperty(msg, 'name', 'test')
+        SetMessageProperty(msg, 'id', str(time.time()))
+
+        result = SendMessageSync(producer, msg)
+        DestroyMessage(msg)
+        print '[RMQ-PRODUCER]start sending...done, msg id = ' + \
+            result.GetMsgId()
+
+
+def send_messages_with_tag_sync(count):
+    print 'sending message with tag...' + tag
+    for a in range(count):
+        body = 'hi rmq, now is ' + \
+            time.strftime('%Y.%m.%d', time.localtime(time.time()))
+        msg = CreateMessage(topic)
+        SetMessageBody(msg, body)
+        SetMessageTags(msg, tag)
+        result = SendMessageSync(producer, msg)
+        DestroyMessage(msg)
+        print 'msg id = ' + result.GetMsgId()
+
+
+def send_messages_with_tag_and_map_sync(count):
+    print 'sending message with tag...' + tag + ' and properties id, name'
+    for a in range(count):
+        body = 'hi rmq, now is ' + \
+            time.strftime('%Y.%m.%d', time.localtime(time.time()))
+        msg = CreateMessage(topic)
+        SetMessageBody(msg, body)
+
+        SetMessageProperty(msg, 'name', 'test')
+        SetMessageProperty(msg, 'id', str(time.time()))
+
+        SetMessageTags(msg, tag)
+        result = SendMessageSync(producer, msg)
+        DestroyMessage(msg)
+        print 'msg id = ' + result.GetMsgId()
+
+
+def send_messages_with_key_sync(count):
+    print 'sending message with keys...' + key
+    for a in range(count):
+        body = 'hi rmq, now is ' + \
+            time.strftime('%Y.%m.%d', time.localtime(time.time()))
+        msg = CreateMessage(topic)
+        SetMessageBody(msg, body)
+        SetMessageKeys(msg, key)
+        result = SendMessageSync(producer, msg)
+        DestroyMessage(msg)
+        print 'msg id = ' + result.GetMsgId()
+
+
+def send_messages_with_key_and_map_sync(count):
+    print 'sending message with keys...' + key + ' and properties id, name'
+    for a in range(count):
+        body = 'hi rmq, now is ' + \
+            time.strftime('%Y.%m.%d', time.localtime(time.time()))
+        msg = CreateMessage(topic)
+        SetMessageBody(msg, body)
+        SetMessageKeys(msg, key)
+
+        SetMessageProperty(msg, 'name', 'test')
+        SetMessageProperty(msg, 'id', str(time.time()))
+
+        result = SendMessageSync(producer, msg)
+        DestroyMessage(msg)
+        print 'msg id = ' + result.GetMsgId()
+
+
+def send_messages_with_key_and_tag_sync(count):
+    key = 'rmq-key'
+    print 'sending message with keys and tag...' + key + ', ' + tag
+    for a in range(count):
+        body = 'hi rmq, now is ' + \
+            time.strftime('%Y.%m.%d', time.localtime(time.time()))
+        msg = CreateMessage(topic)
+        SetMessageBody(msg, body)
+        SetMessageKeys(msg, key)
+        SetMessageTags(msg, tag)
+        result = SendMessageSync(producer, msg)
+        DestroyMessage(msg)
+        print 'msg id = ' + result.GetMsgId()
+
+
+def send_messages_with_key_and_tag_and_map_sync(count):
+    key = 'rmq-key'
+    print 'sending message with keys and tag...' + \
+        key + ', ' + tag + ' and properties id, name'
+    for a in range(count):
+        body = 'hi rmq, now is ' + \
+            time.strftime('%Y.%m.%d', time.localtime(time.time()))
+        msg = CreateMessage(topic)
+        SetMessageBody(msg, body)
+        SetMessageKeys(msg, key)
+
+        SetMessageProperty(msg, 'name', 'test')
+        SetMessageProperty(msg, 'id', str(time.time()))
+
+        SetMessageTags(msg, tag)
+        result = SendMessageSync(producer, msg)
+        DestroyMessage(msg)
+        print 'msg id = ' + result.GetMsgId()
+
+
+def send_messages_oneway(count):
+    for a in range(count):
+        print 'start sending...'
+        body = 'hi rmq, this is oneway message. now is ' + \
+            time.strftime('%Y.%m.%d', time.localtime(time.time()))
+        msg = CreateMessage(topic)
+        SetMessageBody(msg, body)
+
+        SetMessageKeys(msg, key)
+        SetMessageProperty(msg, 'name', 'test')
+        SetMessageProperty(msg, 'id', str(time.time()))
+
+        SendMessageOneway(producer, msg)
+        DestroyMessage(msg)
+        print 'send oneway is over'
+
+
+def send_delay_messages(producer, topic, count):
+    key = 'rmq-key'
+    print 'start sending message'
+    tag = 'test'
+    for n in range(count):
+        body = 'hi rmq, now is' + str(time.time())
+        msg = CreateMessage(topic)
+        SetMessageBody(msg, body)
+        SetMessageKeys(msg, key)
+        SetMessageProperty(msg, 'name', 'hello world')
+        SetMessageProperty(msg, 'id', str(time.time()))
+        SetMessageTags(msg, tag)
+        # messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 
30m 1h 2h
+
+        SetDelayTimeLevel(msg, 5)
+
+        print str(msg)
+        result = SendMessageSync(producer, msg)
+        DestroyMessage(msg)
+        print 'msg id =' + result.GetMsgId()
+
+
+if __name__ == '__main__':
+    # print GetVersion()
+    while True:
+        send_messages_oneway(1)
+        time.sleep(1)
diff --git a/test/__init__.py b/test/__init__.py
new file mode 100644
index 0000000..f3a3a82
--- /dev/null
+++ b/test/__init__.py
@@ -0,0 +1,22 @@
+# /*
+# * Licensed to the Apache Software Foundation (ASF) under one or more
+# * contributor license agreements.  See the NOTICE file distributed with
+# * this work for additional information regarding copyright ownership.
+# * The ASF licenses this file to You under the Apache License, Version 2.0
+# * (the "License"); you may not use this file except in compliance with
+# * the License.  You may obtain a copy of the License at
+# *
+# *     http://www.apache.org/licenses/LICENSE-2.0
+# *
+# * Unless required by applicable law or agreed to in writing, software
+# * distributed under the License is distributed on an "AS IS" BASIS,
+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# * See the License for the specific language governing permissions and
+# * limitations under the License.
+# */
+
+import sys
+sys.path.append('/usr/local/lib')
+print("__________Python Version:___________")
+print(sys.version)
+print("______Add Path /usr/local/lib_______")


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to