http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/com/alibaba/rocketmq/example/transaction/TransactionCheckListenerImpl.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/com/alibaba/rocketmq/example/transaction/TransactionCheckListenerImpl.java b/example/src/main/java/com/alibaba/rocketmq/example/transaction/TransactionCheckListenerImpl.java deleted file mode 100644 index 2e91e34..0000000 --- a/example/src/main/java/com/alibaba/rocketmq/example/transaction/TransactionCheckListenerImpl.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.example.transaction; - -import com.alibaba.rocketmq.client.producer.LocalTransactionState; -import com.alibaba.rocketmq.client.producer.TransactionCheckListener; -import com.alibaba.rocketmq.common.message.MessageExt; - -import java.util.concurrent.atomic.AtomicInteger; - - -public class TransactionCheckListenerImpl implements TransactionCheckListener { - private AtomicInteger transactionIndex = new AtomicInteger(0); - - - @Override - public LocalTransactionState checkLocalTransactionState(MessageExt msg) { - System.out.printf("server checking TrMsg " + msg.toString() + "%n"); - - int value = transactionIndex.getAndIncrement(); - if ((value % 6) == 0) { - throw new RuntimeException("Could not find db"); - } else if ((value % 5) == 0) { - return LocalTransactionState.ROLLBACK_MESSAGE; - } else if ((value % 4) == 0) { - return LocalTransactionState.COMMIT_MESSAGE; - } - - return LocalTransactionState.UNKNOW; - } -}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/com/alibaba/rocketmq/example/transaction/TransactionExecuterImpl.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/com/alibaba/rocketmq/example/transaction/TransactionExecuterImpl.java b/example/src/main/java/com/alibaba/rocketmq/example/transaction/TransactionExecuterImpl.java deleted file mode 100644 index cda523a..0000000 --- a/example/src/main/java/com/alibaba/rocketmq/example/transaction/TransactionExecuterImpl.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.example.transaction; - -import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter; -import com.alibaba.rocketmq.client.producer.LocalTransactionState; -import com.alibaba.rocketmq.common.message.Message; - -import java.util.concurrent.atomic.AtomicInteger; - -public class TransactionExecuterImpl implements LocalTransactionExecuter { - private AtomicInteger transactionIndex = new AtomicInteger(1); - - - @Override - public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) { - int value = transactionIndex.getAndIncrement(); - - if (value == 0) { - throw new RuntimeException("Could not find db"); - } else if ((value % 5) == 0) { - return LocalTransactionState.ROLLBACK_MESSAGE; - } else if ((value % 4) == 0) { - return LocalTransactionState.COMMIT_MESSAGE; - } - - return LocalTransactionState.UNKNOW; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/com/alibaba/rocketmq/example/transaction/TransactionProducer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/com/alibaba/rocketmq/example/transaction/TransactionProducer.java b/example/src/main/java/com/alibaba/rocketmq/example/transaction/TransactionProducer.java deleted file mode 100644 index 2c4745f..0000000 --- a/example/src/main/java/com/alibaba/rocketmq/example/transaction/TransactionProducer.java +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.example.transaction; - -import com.alibaba.rocketmq.client.exception.MQClientException; -import com.alibaba.rocketmq.client.producer.SendResult; -import com.alibaba.rocketmq.client.producer.TransactionCheckListener; -import com.alibaba.rocketmq.client.producer.TransactionMQProducer; -import com.alibaba.rocketmq.common.message.Message; -import com.alibaba.rocketmq.remoting.common.RemotingHelper; - -import java.io.UnsupportedEncodingException; - -public class TransactionProducer { - public static void main(String[] args) throws MQClientException, InterruptedException { - TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl(); - TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name"); - producer.setCheckThreadPoolMinSize(2); - producer.setCheckThreadPoolMaxSize(2); - producer.setCheckRequestHoldMax(2000); - producer.setTransactionCheckListener(transactionCheckListener); - producer.start(); - - String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"}; - TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl(); - for (int i = 0; i < 100; i++) { - try { - Message msg = - new Message("TopicTest", tags[i % tags.length], "KEY" + i, - ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); - SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null); - System.out.printf("%s%n", sendResult); - - Thread.sleep(10); - } catch (MQClientException e) { - e.printStackTrace(); - } catch (UnsupportedEncodingException e) { - e.printStackTrace(); - } - } - - for (int i = 0; i < 100000; i++) { - Thread.sleep(1000); - } - producer.shutdown(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java new file mode 100644 index 0000000..1fbb8a4 --- /dev/null +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java @@ -0,0 +1,214 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.example.benchmark; + +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; + +import java.util.LinkedList; +import java.util.List; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.atomic.AtomicLong; + +public class Consumer { + + public static void main(String[] args) throws MQClientException { + Options options = ServerUtil.buildCommandlineOptions(new Options()); + CommandLine commandLine = ServerUtil.parseCmdLine("benchmarkConsumer", args, buildCommandlineOptions(options), new PosixParser()); + if (null == commandLine) { + System.exit(-1); + } + + final String topic = commandLine.hasOption('t') ? commandLine.getOptionValue('t').trim() : "BenchmarkTest"; + final String groupPrefix = commandLine.hasOption('g') ? commandLine.getOptionValue('g').trim() : "benchmark_consumer"; + final String isPrefixEnable = commandLine.hasOption('p') ? commandLine.getOptionValue('p').trim() : "true"; + String group = groupPrefix; + if (Boolean.parseBoolean(isPrefixEnable)) { + group = groupPrefix + "_" + Long.toString(System.currentTimeMillis() % 100); + } + + System.out.printf("topic %s group %s prefix %s%n", topic, group, isPrefixEnable); + + final StatsBenchmarkConsumer statsBenchmarkConsumer = new StatsBenchmarkConsumer(); + + final Timer timer = new Timer("BenchmarkTimerThread", true); + + final LinkedList<Long[]> snapshotList = new LinkedList<Long[]>(); + + timer.scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + snapshotList.addLast(statsBenchmarkConsumer.createSnapshot()); + if (snapshotList.size() > 10) { + snapshotList.removeFirst(); + } + } + }, 1000, 1000); + + timer.scheduleAtFixedRate(new TimerTask() { + private void printStats() { + if (snapshotList.size() >= 10) { + Long[] begin = snapshotList.getFirst(); + Long[] end = snapshotList.getLast(); + + final long consumeTps = + (long) (((end[1] - begin[1]) / (double) (end[0] - begin[0])) * 1000L); + final double averageB2CRT = (end[2] - begin[2]) / (double) (end[1] - begin[1]); + final double averageS2CRT = (end[3] - begin[3]) / (double) (end[1] - begin[1]); + + System.out.printf("Consume TPS: %d Average(B2C) RT: %7.3f Average(S2C) RT: %7.3f MAX(B2C) RT: %d MAX(S2C) RT: %d%n", + consumeTps, averageB2CRT, averageS2CRT, end[4], end[5] + ); + } + } + + + @Override + public void run() { + try { + this.printStats(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }, 10000, 10000); + + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group); + consumer.setInstanceName(Long.toString(System.currentTimeMillis())); + + consumer.subscribe(topic, "*"); + + consumer.registerMessageListener(new MessageListenerConcurrently() { + @Override + public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, + ConsumeConcurrentlyContext context) { + MessageExt msg = msgs.get(0); + long now = System.currentTimeMillis(); + + statsBenchmarkConsumer.getReceiveMessageTotalCount().incrementAndGet(); + + long born2ConsumerRT = now - msg.getBornTimestamp(); + statsBenchmarkConsumer.getBorn2ConsumerTotalRT().addAndGet(born2ConsumerRT); + + long store2ConsumerRT = now - msg.getStoreTimestamp(); + statsBenchmarkConsumer.getStore2ConsumerTotalRT().addAndGet(store2ConsumerRT); + + compareAndSetMax(statsBenchmarkConsumer.getBorn2ConsumerMaxRT(), born2ConsumerRT); + + compareAndSetMax(statsBenchmarkConsumer.getStore2ConsumerMaxRT(), store2ConsumerRT); + + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + }); + + consumer.start(); + + System.out.printf("Consumer Started.%n"); + } + + public static Options buildCommandlineOptions(final Options options) { + Option opt = new Option("t", "topic", true, "Topic name, Default: BenchmarkTest"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("g", "group", true, "Consumer group name, Default: benchmark_consumer"); + opt.setRequired(false); + options.addOption(opt); + + + opt = new Option("p", "group prefix enable", true, "Consumer group name, Default: false"); + opt.setRequired(false); + options.addOption(opt); + + return options; + } + + + public static void compareAndSetMax(final AtomicLong target, final long value) { + long prev = target.get(); + while (value > prev) { + boolean updated = target.compareAndSet(prev, value); + if (updated) + break; + + prev = target.get(); + } + } +} + + +class StatsBenchmarkConsumer { + private final AtomicLong receiveMessageTotalCount = new AtomicLong(0L); + + private final AtomicLong born2ConsumerTotalRT = new AtomicLong(0L); + + private final AtomicLong store2ConsumerTotalRT = new AtomicLong(0L); + + private final AtomicLong born2ConsumerMaxRT = new AtomicLong(0L); + + private final AtomicLong store2ConsumerMaxRT = new AtomicLong(0L); + + + public Long[] createSnapshot() { + Long[] snap = new Long[]{ + System.currentTimeMillis(), + this.receiveMessageTotalCount.get(), + this.born2ConsumerTotalRT.get(), + this.store2ConsumerTotalRT.get(), + this.born2ConsumerMaxRT.get(), + this.store2ConsumerMaxRT.get(), + }; + + return snap; + } + + + public AtomicLong getReceiveMessageTotalCount() { + return receiveMessageTotalCount; + } + + + public AtomicLong getBorn2ConsumerTotalRT() { + return born2ConsumerTotalRT; + } + + + public AtomicLong getStore2ConsumerTotalRT() { + return store2ConsumerTotalRT; + } + + + public AtomicLong getBorn2ConsumerMaxRT() { + return born2ConsumerMaxRT; + } + + + public AtomicLong getStore2ConsumerMaxRT() { + return store2ConsumerMaxRT; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java new file mode 100644 index 0000000..3b13f94 --- /dev/null +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java @@ -0,0 +1,262 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.example.benchmark; + +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.log.ClientLogger; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.slf4j.Logger; + +import java.io.UnsupportedEncodingException; +import java.util.LinkedList; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; + +public class Producer { + public static void main(String[] args) throws MQClientException, UnsupportedEncodingException { + + Options options = ServerUtil.buildCommandlineOptions(new Options()); + CommandLine commandLine = ServerUtil.parseCmdLine("benchmarkProducer", args, buildCommandlineOptions(options), new PosixParser()); + if (null == commandLine) { + System.exit(-1); + } + + final String topic = commandLine.hasOption('t') ? commandLine.getOptionValue('t').trim() : "BenchmarkTest"; + final int threadCount = commandLine.hasOption('w') ? Integer.parseInt(commandLine.getOptionValue('w')) : 64; + final int messageSize = commandLine.hasOption('s') ? Integer.parseInt(commandLine.getOptionValue('s')) : 128; + final boolean keyEnable = commandLine.hasOption('k') ? Boolean.parseBoolean(commandLine.getOptionValue('k')) : false; + + System.out.printf("topic %s threadCount %d messageSize %d keyEnable %s%n", topic, threadCount, messageSize, keyEnable); + + final Logger log = ClientLogger.getLog(); + + final Message msg = buildMessage(messageSize, topic); + + final ExecutorService sendThreadPool = Executors.newFixedThreadPool(threadCount); + + final StatsBenchmarkProducer statsBenchmark = new StatsBenchmarkProducer(); + + final Timer timer = new Timer("BenchmarkTimerThread", true); + + final LinkedList<Long[]> snapshotList = new LinkedList<Long[]>(); + + timer.scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + snapshotList.addLast(statsBenchmark.createSnapshot()); + if (snapshotList.size() > 10) { + snapshotList.removeFirst(); + } + } + }, 1000, 1000); + + timer.scheduleAtFixedRate(new TimerTask() { + private void printStats() { + if (snapshotList.size() >= 10) { + Long[] begin = snapshotList.getFirst(); + Long[] end = snapshotList.getLast(); + + final long sendTps = (long) (((end[3] - begin[3]) / (double) (end[0] - begin[0])) * 1000L); + final double averageRT = (end[5] - begin[5]) / (double) (end[3] - begin[3]); + + System.out.printf("Send TPS: %d Max RT: %d Average RT: %7.3f Send Failed: %d Response Failed: %d%n", + sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, end[2], end[4]); + } + } + + + @Override + public void run() { + try { + this.printStats(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }, 10000, 10000); + + final DefaultMQProducer producer = new DefaultMQProducer("benchmark_producer"); + producer.setInstanceName(Long.toString(System.currentTimeMillis())); + + if (commandLine.hasOption('n')) { + String ns = commandLine.getOptionValue('n'); + producer.setNamesrvAddr(ns); + } + + producer.setCompressMsgBodyOverHowmuch(Integer.MAX_VALUE); + + producer.start(); + + for (int i = 0; i < threadCount; i++) { + sendThreadPool.execute(new Runnable() { + @Override + public void run() { + while (true) { + try { + final long beginTimestamp = System.currentTimeMillis(); + if (keyEnable) { + msg.setKeys(String.valueOf(beginTimestamp / 1000)); + } + producer.send(msg); + statsBenchmark.getSendRequestSuccessCount().incrementAndGet(); + statsBenchmark.getReceiveResponseSuccessCount().incrementAndGet(); + final long currentRT = System.currentTimeMillis() - beginTimestamp; + statsBenchmark.getSendMessageSuccessTimeTotal().addAndGet(currentRT); + long prevMaxRT = statsBenchmark.getSendMessageMaxRT().get(); + while (currentRT > prevMaxRT) { + boolean updated = statsBenchmark.getSendMessageMaxRT().compareAndSet(prevMaxRT, currentRT); + if (updated) + break; + + prevMaxRT = statsBenchmark.getSendMessageMaxRT().get(); + } + } catch (RemotingException e) { + statsBenchmark.getSendRequestFailedCount().incrementAndGet(); + log.error("[BENCHMARK_PRODUCER] Send Exception", e); + + try { + Thread.sleep(3000); + } catch (InterruptedException e1) { + } + } catch (InterruptedException e) { + statsBenchmark.getSendRequestFailedCount().incrementAndGet(); + try { + Thread.sleep(3000); + } catch (InterruptedException e1) { + } + } catch (MQClientException e) { + statsBenchmark.getSendRequestFailedCount().incrementAndGet(); + log.error("[BENCHMARK_PRODUCER] Send Exception", e); + } catch (MQBrokerException e) { + statsBenchmark.getReceiveResponseFailedCount().incrementAndGet(); + log.error("[BENCHMARK_PRODUCER] Send Exception", e); + try { + Thread.sleep(3000); + } catch (InterruptedException e1) { + } + } + } + } + }); + } + } + + public static Options buildCommandlineOptions(final Options options) { + Option opt = new Option("w", "threadCount", true, "Thread count, Default: 64"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("s", "messageSize", true, "Message Size, Default: 128"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("k", "keyEnable", true, "Message Key Enable, Default: false"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("t", "topic", true, "Topic name, Default: BenchmarkTest"); + opt.setRequired(false); + options.addOption(opt); + + return options; + } + + private static Message buildMessage(final int messageSize, final String topic) throws UnsupportedEncodingException { + Message msg = new Message(); + msg.setTopic(topic); + + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < messageSize; i += 10) { + sb.append("hello baby"); + } + + msg.setBody(sb.toString().getBytes(RemotingHelper.DEFAULT_CHARSET)); + + return msg; + } +} + + +class StatsBenchmarkProducer { + private final AtomicLong sendRequestSuccessCount = new AtomicLong(0L); + + private final AtomicLong sendRequestFailedCount = new AtomicLong(0L); + + private final AtomicLong receiveResponseSuccessCount = new AtomicLong(0L); + + private final AtomicLong receiveResponseFailedCount = new AtomicLong(0L); + + private final AtomicLong sendMessageSuccessTimeTotal = new AtomicLong(0L); + + private final AtomicLong sendMessageMaxRT = new AtomicLong(0L); + + + public Long[] createSnapshot() { + Long[] snap = new Long[]{ + System.currentTimeMillis(), + this.sendRequestSuccessCount.get(), + this.sendRequestFailedCount.get(), + this.receiveResponseSuccessCount.get(), + this.receiveResponseFailedCount.get(), + this.sendMessageSuccessTimeTotal.get(), + }; + + return snap; + } + + + public AtomicLong getSendRequestSuccessCount() { + return sendRequestSuccessCount; + } + + + public AtomicLong getSendRequestFailedCount() { + return sendRequestFailedCount; + } + + + public AtomicLong getReceiveResponseSuccessCount() { + return receiveResponseSuccessCount; + } + + + public AtomicLong getReceiveResponseFailedCount() { + return receiveResponseFailedCount; + } + + + public AtomicLong getSendMessageSuccessTimeTotal() { + return sendMessageSuccessTimeTotal; + } + + + public AtomicLong getSendMessageMaxRT() { + return sendMessageMaxRT; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java new file mode 100644 index 0000000..43f159b --- /dev/null +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java @@ -0,0 +1,265 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.example.benchmark; + +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.apache.rocketmq.client.producer.*; + +import java.io.UnsupportedEncodingException; +import java.util.LinkedList; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; + +public class TransactionProducer { + private static int threadCount; + private static int messageSize; + private static boolean ischeck; + private static boolean ischeckffalse; + + + public static void main(String[] args) throws MQClientException, UnsupportedEncodingException { + threadCount = args.length >= 1 ? Integer.parseInt(args[0]) : 32; + messageSize = args.length >= 2 ? Integer.parseInt(args[1]) : 1024 * 2; + ischeck = args.length >= 3 ? Boolean.parseBoolean(args[2]) : false; + ischeckffalse = args.length >= 4 ? Boolean.parseBoolean(args[3]) : false; + + final Message msg = buildMessage(messageSize); + + final ExecutorService sendThreadPool = Executors.newFixedThreadPool(threadCount); + + final StatsBenchmarkTProducer statsBenchmark = new StatsBenchmarkTProducer(); + + final Timer timer = new Timer("BenchmarkTimerThread", true); + + final LinkedList<Long[]> snapshotList = new LinkedList<Long[]>(); + + timer.scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + snapshotList.addLast(statsBenchmark.createSnapshot()); + while (snapshotList.size() > 10) { + snapshotList.removeFirst(); + } + } + }, 1000, 1000); + + timer.scheduleAtFixedRate(new TimerTask() { + private void printStats() { + if (snapshotList.size() >= 10) { + Long[] begin = snapshotList.getFirst(); + Long[] end = snapshotList.getLast(); + + final long sendTps = + (long) (((end[3] - begin[3]) / (double) (end[0] - begin[0])) * 1000L); + final double averageRT = (end[5] - begin[5]) / (double) (end[3] - begin[3]); + + System.out.printf( + "Send TPS: %d Max RT: %d Average RT: %7.3f Send Failed: %d Response Failed: %d transaction checkCount: %d %n", + sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, end[2], end[4], end[6]); + } + } + + + @Override + public void run() { + try { + this.printStats(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }, 10000, 10000); + + final TransactionCheckListener transactionCheckListener = + new TransactionCheckListenerBImpl(ischeckffalse, statsBenchmark); + final TransactionMQProducer producer = new TransactionMQProducer("benchmark_transaction_producer"); + producer.setInstanceName(Long.toString(System.currentTimeMillis())); + producer.setTransactionCheckListener(transactionCheckListener); + producer.setDefaultTopicQueueNums(1000); + producer.start(); + + final TransactionExecuterBImpl tranExecuter = new TransactionExecuterBImpl(ischeck); + + for (int i = 0; i < threadCount; i++) { + sendThreadPool.execute(new Runnable() { + @Override + public void run() { + while (true) { + try { + // Thread.sleep(1000); + final long beginTimestamp = System.currentTimeMillis(); + SendResult sendResult = + producer.sendMessageInTransaction(msg, tranExecuter, null); + if (sendResult != null) { + statsBenchmark.getSendRequestSuccessCount().incrementAndGet(); + statsBenchmark.getReceiveResponseSuccessCount().incrementAndGet(); + } + + final long currentRT = System.currentTimeMillis() - beginTimestamp; + statsBenchmark.getSendMessageSuccessTimeTotal().addAndGet(currentRT); + long prevMaxRT = statsBenchmark.getSendMessageMaxRT().get(); + while (currentRT > prevMaxRT) { + boolean updated = + statsBenchmark.getSendMessageMaxRT().compareAndSet(prevMaxRT, + currentRT); + if (updated) + break; + + prevMaxRT = statsBenchmark.getSendMessageMaxRT().get(); + } + } catch (MQClientException e) { + statsBenchmark.getSendRequestFailedCount().incrementAndGet(); + } + } + } + }); + } + } + + + private static Message buildMessage(final int messageSize) throws UnsupportedEncodingException { + Message msg = new Message(); + msg.setTopic("BenchmarkTest"); + + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < messageSize; i += 10) { + sb.append("hello baby"); + } + + msg.setBody(sb.toString().getBytes(RemotingHelper.DEFAULT_CHARSET)); + + return msg; + } +} + + +class TransactionExecuterBImpl implements LocalTransactionExecuter { + + private boolean ischeck; + + + public TransactionExecuterBImpl(boolean ischeck) { + this.ischeck = ischeck; + } + + + @Override + public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) { + if (ischeck) { + return LocalTransactionState.UNKNOW; + } + return LocalTransactionState.COMMIT_MESSAGE; + } +} + + +class TransactionCheckListenerBImpl implements TransactionCheckListener { + private boolean ischeckffalse; + private StatsBenchmarkTProducer statsBenchmarkTProducer; + + + public TransactionCheckListenerBImpl(boolean ischeckffalse, + StatsBenchmarkTProducer statsBenchmarkTProducer) { + this.ischeckffalse = ischeckffalse; + this.statsBenchmarkTProducer = statsBenchmarkTProducer; + } + + + @Override + public LocalTransactionState checkLocalTransactionState(MessageExt msg) { + statsBenchmarkTProducer.getCheckRequestSuccessCount().incrementAndGet(); + if (ischeckffalse) { + + return LocalTransactionState.ROLLBACK_MESSAGE; + } + + return LocalTransactionState.COMMIT_MESSAGE; + } +} + + +class StatsBenchmarkTProducer { + private final AtomicLong sendRequestSuccessCount = new AtomicLong(0L); + + private final AtomicLong sendRequestFailedCount = new AtomicLong(0L); + + private final AtomicLong receiveResponseSuccessCount = new AtomicLong(0L); + + private final AtomicLong receiveResponseFailedCount = new AtomicLong(0L); + + private final AtomicLong sendMessageSuccessTimeTotal = new AtomicLong(0L); + + private final AtomicLong sendMessageMaxRT = new AtomicLong(0L); + + private final AtomicLong checkRequestSuccessCount = new AtomicLong(0L); + + + public Long[] createSnapshot() { + Long[] snap = new Long[]{ + System.currentTimeMillis(), + this.sendRequestSuccessCount.get(), + this.sendRequestFailedCount.get(), + this.receiveResponseSuccessCount.get(), + this.receiveResponseFailedCount.get(), + this.sendMessageSuccessTimeTotal.get(), + this.checkRequestSuccessCount.get()}; + + return snap; + } + + + public AtomicLong getSendRequestSuccessCount() { + return sendRequestSuccessCount; + } + + + public AtomicLong getSendRequestFailedCount() { + return sendRequestFailedCount; + } + + + public AtomicLong getReceiveResponseSuccessCount() { + return receiveResponseSuccessCount; + } + + + public AtomicLong getReceiveResponseFailedCount() { + return receiveResponseFailedCount; + } + + + public AtomicLong getSendMessageSuccessTimeTotal() { + return sendMessageSuccessTimeTotal; + } + + + public AtomicLong getSendMessageMaxRT() { + return sendMessageMaxRT; + } + + + public AtomicLong getCheckRequestSuccessCount() { + return checkRequestSuccessCount; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java b/example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java new file mode 100644 index 0000000..aa62a1e --- /dev/null +++ b/example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.example.broadcast; + +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; + +import java.util.List; + +public class PushConsumer { + + public static void main(String[] args) throws InterruptedException, MQClientException { + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_1"); + + consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); + + consumer.setMessageModel(MessageModel.BROADCASTING); + + consumer.subscribe("TopicTest", "TagA || TagC || TagD"); + + consumer.registerMessageListener(new MessageListenerConcurrently() { + + @Override + public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, + ConsumeConcurrentlyContext context) { + System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n"); + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + }); + + consumer.start(); + System.out.printf("Broadcast Consumer Started.%n"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/org/apache/rocketmq/example/filter/Consumer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/filter/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/filter/Consumer.java new file mode 100644 index 0000000..d0a41f1 --- /dev/null +++ b/example/src/main/java/org/apache/rocketmq/example/filter/Consumer.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.example.filter; + +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.message.MessageExt; + +import java.util.List; + + +public class Consumer { + + public static void main(String[] args) throws InterruptedException, MQClientException { + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupNamecc4"); + + String filterCode = MixAll.file2String("/home/admin/MessageFilterImpl.java"); + consumer.subscribe("TopicFilter7", "org.apache.rocketmq.example.filter.MessageFilterImpl", + filterCode); + + consumer.registerMessageListener(new MessageListenerConcurrently() { + + @Override + public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, + ConsumeConcurrentlyContext context) { + System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n"); + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + }); + + consumer.start(); + + System.out.printf("Consumer Started.%n"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/org/apache/rocketmq/example/filter/Producer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/filter/Producer.java b/example/src/main/java/org/apache/rocketmq/example/filter/Producer.java new file mode 100644 index 0000000..d58c28d --- /dev/null +++ b/example/src/main/java/org/apache/rocketmq/example/filter/Producer.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.example.filter; + +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.common.RemotingHelper; + +public class Producer { + public static void main(String[] args) throws MQClientException, InterruptedException { + DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); + producer.start(); + + try { + for (int i = 0; i < 6000000; i++) { + Message msg = new Message("TopicFilter7", + "TagA", + "OrderID001", + "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); + + msg.putUserProperty("SequenceId", String.valueOf(i)); + SendResult sendResult = producer.send(msg); + System.out.printf("%s%n", sendResult); + } + } catch (Exception e) { + e.printStackTrace(); + } + producer.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/org/apache/rocketmq/example/operation/Consumer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/operation/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/operation/Consumer.java new file mode 100644 index 0000000..a6a3aca --- /dev/null +++ b/example/src/main/java/org/apache/rocketmq/example/operation/Consumer.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.example.operation; + +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.cli.PosixParser; + +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + + +public class Consumer { + + public static void main(String[] args) throws InterruptedException, MQClientException { + CommandLine commandLine = buildCommandline(args); + if (commandLine != null) { + String group = commandLine.getOptionValue('g'); + String topic = commandLine.getOptionValue('t'); + String subscription = commandLine.getOptionValue('s'); + final String returnFailedHalf = commandLine.getOptionValue('f'); + + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group); + consumer.setInstanceName(Long.toString(System.currentTimeMillis())); + + consumer.subscribe(topic, subscription); + + consumer.registerMessageListener(new MessageListenerConcurrently() { + AtomicLong consumeTimes = new AtomicLong(0); + + + @Override + public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, + ConsumeConcurrentlyContext context) { + long currentTimes = this.consumeTimes.incrementAndGet(); + System.out.printf("%-8d %s%n", currentTimes, msgs); + if (Boolean.parseBoolean(returnFailedHalf)) { + if ((currentTimes % 2) == 0) { + return ConsumeConcurrentlyStatus.RECONSUME_LATER; + } + } + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + }); + + consumer.start(); + + System.out.printf("Consumer Started.%n"); + } + } + + public static CommandLine buildCommandline(String[] args) { + final Options options = new Options(); + Option opt = new Option("h", "help", false, "Print help"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("g", "consumerGroup", true, "Consumer Group Name"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("t", "topic", true, "Topic Name"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("s", "subscription", true, "subscription"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("f", "returnFailedHalf", true, "return failed result, for half message"); + opt.setRequired(true); + options.addOption(opt); + + PosixParser parser = new PosixParser(); + HelpFormatter hf = new HelpFormatter(); + hf.setWidth(110); + CommandLine commandLine = null; + try { + commandLine = parser.parse(options, args); + if (commandLine.hasOption('h')) { + hf.printHelp("producer", options, true); + return null; + } + } catch (ParseException e) { + hf.printHelp("producer", options, true); + return null; + } + + return commandLine; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/org/apache/rocketmq/example/operation/Producer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/operation/Producer.java b/example/src/main/java/org/apache/rocketmq/example/operation/Producer.java new file mode 100644 index 0000000..54e256b --- /dev/null +++ b/example/src/main/java/org/apache/rocketmq/example/operation/Producer.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.example.operation; + +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.apache.commons.cli.*; + +public class Producer { + + public static void main(String[] args) throws MQClientException, InterruptedException { + CommandLine commandLine = buildCommandline(args); + if (commandLine != null) { + String group = commandLine.getOptionValue('g'); + String topic = commandLine.getOptionValue('t'); + String tags = commandLine.getOptionValue('a'); + String keys = commandLine.getOptionValue('k'); + String msgCount = commandLine.getOptionValue('c'); + + DefaultMQProducer producer = new DefaultMQProducer(group); + producer.setInstanceName(Long.toString(System.currentTimeMillis())); + + producer.start(); + + for (int i = 0; i < Integer.parseInt(msgCount); i++) { + try { + Message msg = new Message( + topic, + tags, + keys, + ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); + SendResult sendResult = producer.send(msg); + System.out.printf("%-8d %s%n", i, sendResult); + } catch (Exception e) { + e.printStackTrace(); + Thread.sleep(1000); + } + } + + producer.shutdown(); + } + } + + public static CommandLine buildCommandline(String[] args) { + final Options options = new Options(); + Option opt = new Option("h", "help", false, "Print help"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("g", "producerGroup", true, "Producer Group Name"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("t", "topic", true, "Topic Name"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("a", "tags", true, "Tags Name"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("k", "keys", true, "Keys Name"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("c", "msgCount", true, "Message Count"); + opt.setRequired(true); + options.addOption(opt); + + PosixParser parser = new PosixParser(); + HelpFormatter hf = new HelpFormatter(); + hf.setWidth(110); + CommandLine commandLine = null; + try { + commandLine = parser.parse(options, args); + if (commandLine.hasOption('h')) { + hf.printHelp("producer", options, true); + return null; + } + } catch (ParseException e) { + hf.printHelp("producer", options, true); + return null; + } + + return commandLine; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/org/apache/rocketmq/example/ordermessage/Consumer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/ordermessage/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/ordermessage/Consumer.java new file mode 100644 index 0000000..7ddfbf7 --- /dev/null +++ b/example/src/main/java/org/apache/rocketmq/example/ordermessage/Consumer.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.example.ordermessage; + +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.message.MessageExt; + +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + + +public class Consumer { + + public static void main(String[] args) throws MQClientException { + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3"); + + consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); + + consumer.subscribe("TopicTest", "TagA || TagC || TagD"); + + consumer.registerMessageListener(new MessageListenerOrderly() { + AtomicLong consumeTimes = new AtomicLong(0); + + @Override + public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { + context.setAutoCommit(false); + System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n"); + this.consumeTimes.incrementAndGet(); + if ((this.consumeTimes.get() % 2) == 0) { + return ConsumeOrderlyStatus.SUCCESS; + } else if ((this.consumeTimes.get() % 3) == 0) { + return ConsumeOrderlyStatus.ROLLBACK; + } else if ((this.consumeTimes.get() % 4) == 0) { + return ConsumeOrderlyStatus.COMMIT; + } else if ((this.consumeTimes.get() % 5) == 0) { + context.setSuspendCurrentQueueTimeMillis(3000); + return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; + } + + return ConsumeOrderlyStatus.SUCCESS; + } + }); + + consumer.start(); + System.out.printf("Consumer Started.%n"); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/org/apache/rocketmq/example/ordermessage/Producer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/ordermessage/Producer.java b/example/src/main/java/org/apache/rocketmq/example/ordermessage/Producer.java new file mode 100644 index 0000000..84c1da4 --- /dev/null +++ b/example/src/main/java/org/apache/rocketmq/example/ordermessage/Producer.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.example.ordermessage; + +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.MQProducer; +import org.apache.rocketmq.client.producer.MessageQueueSelector; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.apache.rocketmq.remoting.exception.RemotingException; + +import java.io.UnsupportedEncodingException; +import java.util.List; + +public class Producer { + public static void main(String[] args) throws UnsupportedEncodingException { + try { + MQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); + producer.start(); + + String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"}; + for (int i = 0; i < 100; i++) { + int orderId = i % 10; + Message msg = + new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i, + ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); + SendResult sendResult = producer.send(msg, new MessageQueueSelector() { + @Override + public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { + Integer id = (Integer) arg; + int index = id % mqs.size(); + return mqs.get(index); + } + }, orderId); + + System.out.printf("%s%n", sendResult); + } + + producer.shutdown(); + } catch (MQClientException e) { + e.printStackTrace(); + } catch (RemotingException e) { + e.printStackTrace(); + } catch (MQBrokerException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java new file mode 100644 index 0000000..43566f0 --- /dev/null +++ b/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.example.quickstart; + +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.message.MessageExt; + +import java.util.List; + +public class Consumer { + + public static void main(String[] args) throws InterruptedException, MQClientException { + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4"); + + consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); + + consumer.subscribe("TopicTest", "*"); + + consumer.registerMessageListener(new MessageListenerConcurrently() { + + @Override + public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, + ConsumeConcurrentlyContext context) { + System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n"); + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + }); + + consumer.start(); + System.out.printf("Consumer Started.%n"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java b/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java new file mode 100644 index 0000000..f6bd5df --- /dev/null +++ b/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.example.quickstart; + +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.LocalTransactionExecuter; +import org.apache.rocketmq.client.producer.LocalTransactionState; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.common.RemotingHelper; + +public class Producer { + public static void main(String[] args) throws MQClientException, InterruptedException { + DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); + producer.start(); + + for (int i = 0; i < 1000; i++) { + try { + Message msg = new Message("TopicTest", + "TagA", + ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) + ); + SendResult sendResult = producer.send(msg); + LocalTransactionExecuter tranExecuter = new LocalTransactionExecuter() { + @Override + public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) { + return null; + } + }; + System.out.printf("%s%n", sendResult); + } catch (Exception e) { + e.printStackTrace(); + Thread.sleep(1000); + } + } + producer.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/org/apache/rocketmq/example/simple/AsyncProducer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/AsyncProducer.java b/example/src/main/java/org/apache/rocketmq/example/simple/AsyncProducer.java new file mode 100644 index 0000000..68dbb67 --- /dev/null +++ b/example/src/main/java/org/apache/rocketmq/example/simple/AsyncProducer.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.example.simple; + +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.common.RemotingHelper; + +import java.io.UnsupportedEncodingException; + + +public class AsyncProducer { + public static void main(String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException { + + DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test"); + producer.start(); + producer.setRetryTimesWhenSendAsyncFailed(0); + + for (int i = 0; i < 10000000; i++) { + try { + final int index = i; + Message msg = new Message("Jodie_topic_1023", + "TagA", + "OrderID188", + "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); + producer.send(msg, new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId()); + } + + @Override + public void onException(Throwable e) { + System.out.printf("%-10d Exception %s %n", index, e); + e.printStackTrace(); + } + }); + } catch (Exception e) { + e.printStackTrace(); + } + } + producer.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/org/apache/rocketmq/example/simple/CachedQueue.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/CachedQueue.java b/example/src/main/java/org/apache/rocketmq/example/simple/CachedQueue.java new file mode 100644 index 0000000..2b4ce23 --- /dev/null +++ b/example/src/main/java/org/apache/rocketmq/example/simple/CachedQueue.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.example.simple; + +import org.apache.rocketmq.common.message.MessageExt; + +import java.util.TreeMap; + + +public class CachedQueue { + private final TreeMap<Long, MessageExt> msgCachedTable = new TreeMap<Long, MessageExt>(); + + + public TreeMap<Long, MessageExt> getMsgCachedTable() { + return msgCachedTable; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java b/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java new file mode 100644 index 0000000..b035d57 --- /dev/null +++ b/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.example.simple; + +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.common.RemotingHelper; + + +public class Producer { + public static void main(String[] args) throws MQClientException, InterruptedException { + + DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); + + producer.start(); + + for (int i = 0; i < 10000000; i++) + try { + { + Message msg = new Message("TopicTest", + "TagA", + "OrderID188", + "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); + SendResult sendResult = producer.send(msg); + System.out.printf("%s%n", sendResult); + } + + } catch (Exception e) { + e.printStackTrace(); + } + + producer.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java b/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java new file mode 100644 index 0000000..8c9ba15 --- /dev/null +++ b/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.example.simple; + +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.client.consumer.PullResult; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.MessageQueue; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +public class PullConsumer { + private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>(); + + + public static void main(String[] args) throws MQClientException { + DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5"); + + consumer.start(); + + Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest1"); + for (MessageQueue mq : mqs) { + System.out.printf("Consume from the queue: " + mq + "%n"); + SINGLE_MQ: + while (true) { + try { + PullResult pullResult = + consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); + System.out.printf("%s%n", pullResult); + putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); + switch (pullResult.getPullStatus()) { + case FOUND: + break; + case NO_MATCHED_MSG: + break; + case NO_NEW_MSG: + break SINGLE_MQ; + case OFFSET_ILLEGAL: + break; + default: + break; + } + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + consumer.shutdown(); + } + + private static long getMessageQueueOffset(MessageQueue mq) { + Long offset = OFFSE_TABLE.get(mq); + if (offset != null) + return offset; + + return 0; + } + + private static void putMessageQueueOffset(MessageQueue mq, long offset) { + OFFSE_TABLE.put(mq, offset); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumerTest.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumerTest.java b/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumerTest.java new file mode 100644 index 0000000..c2d7468 --- /dev/null +++ b/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumerTest.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.example.simple; + +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.client.consumer.PullResult; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.MessageQueue; + +public class PullConsumerTest { + public static void main(String[] args) throws MQClientException { + DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5"); + consumer.start(); + + try { + MessageQueue mq = new MessageQueue(); + mq.setQueueId(0); + mq.setTopic("TopicTest3"); + mq.setBrokerName("vivedeMacBook-Pro.local"); + + long offset = 26; + + long beginTime = System.currentTimeMillis(); + PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, offset, 32); + System.out.printf("%s%n", System.currentTimeMillis() - beginTime); + System.out.printf("%s%n", pullResult); + } catch (Exception e) { + e.printStackTrace(); + } + + consumer.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/org/apache/rocketmq/example/simple/PullScheduleService.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/PullScheduleService.java b/example/src/main/java/org/apache/rocketmq/example/simple/PullScheduleService.java new file mode 100644 index 0000000..d38d679 --- /dev/null +++ b/example/src/main/java/org/apache/rocketmq/example/simple/PullScheduleService.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.example.simple; + +import org.apache.rocketmq.client.consumer.MQPullConsumer; +import org.apache.rocketmq.client.consumer.MQPullConsumerScheduleService; +import org.apache.rocketmq.client.consumer.PullResult; +import org.apache.rocketmq.client.consumer.PullTaskCallback; +import org.apache.rocketmq.client.consumer.PullTaskContext; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; + + +public class PullScheduleService { + + public static void main(String[] args) throws MQClientException { + final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService("GroupName1"); + + scheduleService.setMessageModel(MessageModel.CLUSTERING); + scheduleService.registerPullTaskCallback("TopicTest1", new PullTaskCallback() { + + @Override + public void doPullTask(MessageQueue mq, PullTaskContext context) { + MQPullConsumer consumer = context.getPullConsumer(); + try { + + long offset = consumer.fetchConsumeOffset(mq, false); + if (offset < 0) + offset = 0; + + PullResult pullResult = consumer.pull(mq, "*", offset, 32); + System.out.printf("%s%n", offset + "\t" + mq + "\t" + pullResult); + switch (pullResult.getPullStatus()) { + case FOUND: + break; + case NO_MATCHED_MSG: + break; + case NO_NEW_MSG: + case OFFSET_ILLEGAL: + break; + default: + break; + } + consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset()); + + + context.setPullNextDelayTimeMillis(100); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + + scheduleService.start(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java b/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java new file mode 100644 index 0000000..5929aff --- /dev/null +++ b/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.example.simple; + +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.message.MessageExt; + +import java.util.List; + + +public class PushConsumer { + + public static void main(String[] args) throws InterruptedException, MQClientException { + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1"); + consumer.subscribe("Jodie_topic_1023", "*"); + consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); + consumer.registerMessageListener(new MessageListenerConcurrently() { + + /** + + */ + @Override + public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { + System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n"); + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + }); + consumer.start(); + System.out.printf("Consumer Started.%n"); + } +}