This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 8fdfb42  [ISSUES #3048]add example of OnewayProducer and 
ScheduledMessage (#3053)
8fdfb42 is described below

commit 8fdfb42ac8ceea24c7c1b6352df59ff8b585acbb
Author: zplovekq <[email protected]>
AuthorDate: Mon Jan 24 13:54:08 2022 +0800

    [ISSUES #3048]add example of OnewayProducer and ScheduledMessage (#3053)
    
    * fix the issue #2994
    add the example for OnewayProducer and ScheduledMessageConsumer.
    remove a unused line and add notes.
    
    * rollback
    
    * change code style
    
    * modify the code style
    
    Co-authored-by: zoupeinie <[email protected]>
---
 .../example/schedule/ScheduledMessageConsumer.java | 51 ++++++++++++++++++++++
 .../example/schedule/ScheduledMessageProducer.java | 41 +++++++++++++++++
 .../rocketmq/example/simple/OnewayProducer.java    | 45 +++++++++++++++++++
 3 files changed, 137 insertions(+)

diff --git 
a/example/src/main/java/org/apache/rocketmq/example/schedule/ScheduledMessageConsumer.java
 
b/example/src/main/java/org/apache/rocketmq/example/schedule/ScheduledMessageConsumer.java
new file mode 100644
index 0000000..fdb4c86
--- /dev/null
+++ 
b/example/src/main/java/org/apache/rocketmq/example/schedule/ScheduledMessageConsumer.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.example.schedule;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import 
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.common.message.MessageExt;
+
+import java.util.List;
+
+public class ScheduledMessageConsumer {
+    
+    public static void main(String[] args) throws Exception {
+        // Instantiate message consumer
+        DefaultMQPushConsumer consumer = new 
DefaultMQPushConsumer("ExampleConsumer");
+        // Subscribe topics
+        consumer.subscribe("TestTopic", "*");
+        // Register message listener
+        consumer.registerMessageListener(new MessageListenerConcurrently() {
+            @Override
+            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> 
messages, ConsumeConcurrentlyContext context) {
+                for (MessageExt message : messages) {
+                    // Print approximate delay time period
+                    System.out.printf("Receive message[msgId=%s %d  ms 
later]\n", message.getMsgId(),
+                            System.currentTimeMillis() - 
message.getStoreTimestamp());
+                }
+                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+            }
+        });
+        // Launch consumer
+        consumer.start();
+        //info:to see the time effect, run the consumer first , it will wait 
for the msg
+        //then start the producer
+    }
+}
\ No newline at end of file
diff --git 
a/example/src/main/java/org/apache/rocketmq/example/schedule/ScheduledMessageProducer.java
 
b/example/src/main/java/org/apache/rocketmq/example/schedule/ScheduledMessageProducer.java
new file mode 100644
index 0000000..0f6b722
--- /dev/null
+++ 
b/example/src/main/java/org/apache/rocketmq/example/schedule/ScheduledMessageProducer.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.example.schedule;
+
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.message.Message;
+
+public class ScheduledMessageProducer {
+    public static void main(String[] args) throws Exception {
+        // Instantiate a producer to send scheduled messages
+        DefaultMQProducer producer = new 
DefaultMQProducer("ExampleProducerGroup");
+        // Launch producer
+        producer.start();
+        int totalMessagesToSend = 100;
+        for (int i = 0; i < totalMessagesToSend; i++) {
+            Message message = new Message("TestTopic", ("Hello scheduled 
message " + i).getBytes());
+            // This message will be delivered to consumer 10 seconds later.
+            message.setDelayTimeLevel(3);
+            // Send the message
+            producer.send(message);
+        }
+        
+        // Shutdown producer after use.
+        producer.shutdown();
+    }
+    
+}
\ No newline at end of file
diff --git 
a/example/src/main/java/org/apache/rocketmq/example/simple/OnewayProducer.java 
b/example/src/main/java/org/apache/rocketmq/example/simple/OnewayProducer.java
new file mode 100644
index 0000000..6932f66
--- /dev/null
+++ 
b/example/src/main/java/org/apache/rocketmq/example/simple/OnewayProducer.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.example.simple;
+
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+
+public class OnewayProducer {
+    public static void main(String[] args) throws Exception {
+        //Instantiate with a producer group name.
+        DefaultMQProducer producer = new 
DefaultMQProducer("please_rename_unique_group_name");
+        // Specify name server addresses.
+        producer.setNamesrvAddr("localhost:9876");
+        //Launch the instance.
+        producer.start();
+        for (int i = 0; i < 100; i++) {
+            //Create a message instance, specifying topic, tag and message 
body.
+            Message msg = new Message("TopicTest" /* Topic */,
+                    "TagA" /* Tag */,
+                    ("Hello RocketMQ " +
+                            i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* 
Message body */
+            );
+            //Call send message to deliver message to one of brokers.
+            producer.sendOneway(msg);
+        }
+        //Wait for sending to complete
+        Thread.sleep(5000);
+        producer.shutdown();
+    }
+}
\ No newline at end of file

Reply via email to