This is an automated email from the ASF dual-hosted git repository. yuanbo pushed a commit to branch TUBEMQ-421 in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
commit 8acad06b615063659219f8097de0478b76a10011 Author: gosonzhang <[email protected]> AuthorDate: Mon Dec 14 18:21:36 2020 +0800 [TUBEMQ-449]Adjust Example implementation (#348) Co-authored-by: gosonzhang <[email protected]> --- .../org/apache/tubemq/corebase/utils/Tuple2.java | 53 +++++++ .../tubemq/example/MAMessageProducerExample.java | 158 +++++++++++---------- .../tubemq/example/MessageConsumerExample.java | 60 ++++---- .../tubemq/example/MessageProducerExample.java | 131 +++++++++-------- .../tubemq/example/MessagePullConsumerExample.java | 63 ++++---- .../example/MessagePullSetConsumerExample.java | 62 ++++---- .../tubemq/server/tools/cli/CliProducer.java | 30 ++-- 7 files changed, 292 insertions(+), 265 deletions(-) diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/Tuple2.java b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/Tuple2.java new file mode 100644 index 0000000..f5626f8 --- /dev/null +++ b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/Tuple2.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tubemq.corebase.utils; + +public class Tuple2<T0, T1> { + + /** Field 0 of the tuple. */ + public T0 f0 = null; + /** Field 1 of the tuple. */ + public T1 f1 = null; + + /** + * Creates a new tuple where all fields are null. + */ + public Tuple2() { + + } + + /** + * Creates a new tuple with field 0 specified. + * + * @param value0 The value for field 0 + */ + public Tuple2(T0 value0) { + this.f0 = value0; + } + + /** + * Creates a new tuple and assigns the given values to the tuple's fields. + * + * @param value0 The value for field 0 + * @param value1 The value for field 1 + */ + public Tuple2(T0 value0, T1 value1) { + this.f0 = value0; + this.f1 = value1; + } +} diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java b/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java index f76b077..ea546c9 100644 --- a/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java +++ b/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java @@ -20,12 +20,10 @@ package org.apache.tubemq.example; import java.nio.ByteBuffer; import java.text.SimpleDateFormat; import java.util.ArrayList; -import java.util.Arrays; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -41,8 +39,9 @@ import org.apache.tubemq.client.producer.MessageProducer; import org.apache.tubemq.client.producer.MessageSentCallback; import org.apache.tubemq.client.producer.MessageSentResult; import org.apache.tubemq.corebase.Message; -import org.apache.tubemq.corebase.TErrCodeConstants; +import org.apache.tubemq.corebase.utils.MixedUtils; import org.apache.tubemq.corebase.utils.ThreadUtils; +import org.apache.tubemq.corebase.utils.Tuple2; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,19 +51,24 @@ import org.slf4j.LoggerFactory; * to improve throughput from client to broker. */ public class MAMessageProducerExample { - private static final Logger logger = LoggerFactory.getLogger(MAMessageProducerExample.class); + private static final Logger logger = + LoggerFactory.getLogger(MAMessageProducerExample.class); + private static final AtomicLong TOTAL_COUNTER = new AtomicLong(0); private static final AtomicLong SENT_SUCC_COUNTER = new AtomicLong(0); + private static final AtomicLong SENT_FAIL_COUNTER = new AtomicLong(0); + private static final AtomicLong SENT_EXCEPT_COUNTER = new AtomicLong(0); + private static final List<MessageProducer> PRODUCER_LIST = new ArrayList<>(); private static final int MAX_PRODUCER_NUM = 100; private static final int SESSION_FACTORY_NUM = 10; - private static Set<String> topicSet; + private static Map<String, TreeSet<String>> topicAndFiltersMap; + private static List<Tuple2<String, String>> topicSendRounds = new ArrayList<>(); private static int msgCount; - private static int producerCount; + private static int clientCount; private static byte[] sendData; + private static AtomicLong filterMsgCount = new AtomicLong(0); - private final String[] arrayKey = {"aaa", "bbb", "ccc", "ddd", "eee", "fff", "ggg", "hhh"}; - private final Set<String> filters = new TreeSet<>(); private final Map<MessageProducer, Sender> producerMap = new HashMap<>(); private final List<MessageSessionFactory> sessionFactoryList = new ArrayList<>(); private final ExecutorService sendExecutorService = @@ -76,13 +80,9 @@ public class MAMessageProducerExample { }); private final AtomicInteger producerIndex = new AtomicInteger(0); - private int keyCount = 0; - private int sentCount = 0; - public MAMessageProducerExample(String masterHostAndPort) throws Exception { - this.filters.add("aaa"); - this.filters.add("bbb"); + public MAMessageProducerExample(String masterHostAndPort) throws Exception { TubeClientConfig clientConfig = new TubeClientConfig(masterHostAndPort); for (int i = 0; i < SESSION_FACTORY_NUM; i++) { this.sessionFactoryList.add(new TubeMultiSessionFactory(clientConfig)); @@ -90,40 +90,51 @@ public class MAMessageProducerExample { } public static void main(String[] args) { - final String masterHostAndPort = args[0]; - + // get call parameters + final String masterServers = args[0]; final String topics = args[1]; - final List<String> topicList = Arrays.asList(topics.split(",")); - - topicSet = new TreeSet<>(topicList); - msgCount = Integer.parseInt(args[2]); - producerCount = Math.min(args.length > 4 ? Integer.parseInt(args[3]) : 10, MAX_PRODUCER_NUM); - - logger.info("MAMessageProducerExample.main started..."); - - final byte[] transmitData = StringUtils.getBytesUtf8("This is a test message from multi-session factory."); + clientCount = Math.min(args.length > 4 ? Integer.parseInt(args[3]) : 10, MAX_PRODUCER_NUM); + topicAndFiltersMap = MixedUtils.parseTopicParam(topics); + // initial topic send round + for (Map.Entry<String, TreeSet<String>> entry: topicAndFiltersMap.entrySet()) { + if (entry.getValue().isEmpty()) { + topicSendRounds.add(new Tuple2<String, String>(entry.getKey())); + } else { + for (String filter : entry.getValue()) { + topicSendRounds.add(new Tuple2<String, String>(entry.getKey(), filter)); + } + } + } + // build message's body content + final byte[] transmitData = + StringUtils.getBytesUtf8("This is a test message from multi-session factory."); final ByteBuffer dataBuffer = ByteBuffer.allocate(1024); - while (dataBuffer.hasRemaining()) { int offset = dataBuffer.arrayOffset(); - dataBuffer.put(transmitData, offset, Math.min(dataBuffer.remaining(), transmitData.length)); + dataBuffer.put(transmitData, offset, + Math.min(dataBuffer.remaining(), transmitData.length)); } - dataBuffer.flip(); sendData = dataBuffer.array(); + // print started log + logger.info("MAMessageProducerExample.main started..."); try { - MAMessageProducerExample messageProducer = new MAMessageProducerExample(masterHostAndPort); - + // initial producer objects + MAMessageProducerExample messageProducer = + new MAMessageProducerExample(masterServers); messageProducer.startService(); - - while (SENT_SUCC_COUNTER.get() < msgCount * producerCount * topicSet.size()) { - Thread.sleep(1000); + // wait util sent message's count reachs required count + while (TOTAL_COUNTER.get() < msgCount * clientCount) { + logger.info("Sending, total messages is {}, filter messages is {}", + SENT_SUCC_COUNTER.get(), filterMsgCount.get()); + Thread.sleep(5000); } + logger.info("Finished, total messages is {}, filter messages is {}", + SENT_SUCC_COUNTER.get(), filterMsgCount.get()); messageProducer.producerMap.clear(); messageProducer.shutdown(); - } catch (TubeClientException e) { logger.error("TubeClientException: ", e); } catch (Throwable e) { @@ -137,7 +148,7 @@ public class MAMessageProducerExample { } private void startService() throws TubeClientException { - for (int i = 0; i < producerCount; i++) { + for (int i = 0; i < clientCount; i++) { PRODUCER_LIST.add(createProducer()); } @@ -169,44 +180,44 @@ public class MAMessageProducerExample { public void run() { SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmm"); try { - producer.publish(topicSet); + producer.publish(topicAndFiltersMap.keySet()); } catch (Throwable t) { logger.error("publish exception: ", t); } - for (int i = 0; i < msgCount; i++) { + long sentCount = 0; + int roundIndex = 0; + int targetCnt = topicSendRounds.size(); + while (msgCount < 0 || sentCount < msgCount) { long millis = System.currentTimeMillis(); - for (String topic : topicSet) { - try { - Message message = new Message(topic, sendData); - message.setAttrKeyVal("index", String.valueOf(1)); - message.setAttrKeyVal("dataTime", String.valueOf(millis)); - - String keyCode = arrayKey[sentCount++ % arrayKey.length]; - - // date format is accurate to minute, not to second - message.putSystemHeader(keyCode, sdf.format(new Date(millis))); - if (filters.contains(keyCode)) { - keyCount++; - } - - // next line sends message synchronously, which is not recommended - //producer.sendMessage(message); - - // send message asynchronously, recommended - producer.sendMessage(message, new DefaultSendCallback()); - } catch (Throwable e1) { - logger.error("sendMessage exception: ", e1); - } - - if (i % 5000 == 0) { - ThreadUtils.sleep(3000); - } else if (i % 4000 == 0) { - ThreadUtils.sleep(2000); - } else if (i % 2000 == 0) { - ThreadUtils.sleep(800); - } else if (i % 1000 == 0) { - ThreadUtils.sleep(400); - } + roundIndex = (int) (sentCount++ % targetCnt); + Tuple2<String, String> target = topicSendRounds.get(roundIndex); + Message message = new Message(target.f0, sendData); + message.setAttrKeyVal("index", String.valueOf(sentCount)); + message.setAttrKeyVal("dataTime", String.valueOf(millis)); + if (target.f1 != null) { + filterMsgCount.incrementAndGet(); + message.putSystemHeader(target.f1, sdf.format(new Date(millis))); + } + try { + // next line sends message synchronously, which is not recommended + //producer.sendMessage(message); + // send message asynchronously, recommended + producer.sendMessage(message, new DefaultSendCallback()); + } catch (Throwable e1) { + TOTAL_COUNTER.incrementAndGet(); + SENT_EXCEPT_COUNTER.incrementAndGet(); + logger.error("sendMessage exception: ", e1); + } + TOTAL_COUNTER.incrementAndGet(); + // only for test, delay inflight message's count + if (sentCount % 5000 == 0) { + ThreadUtils.sleep(3000); + } else if (sentCount % 4000 == 0) { + ThreadUtils.sleep(2000); + } else if (sentCount % 2000 == 0) { + ThreadUtils.sleep(800); + } else if (sentCount % 1000 == 0) { + ThreadUtils.sleep(400); } } try { @@ -221,19 +232,18 @@ public class MAMessageProducerExample { private class DefaultSendCallback implements MessageSentCallback { @Override public void onMessageSent(MessageSentResult result) { + TOTAL_COUNTER.incrementAndGet(); if (result.isSuccess()) { - if (SENT_SUCC_COUNTER.incrementAndGet() % 1000 == 0) { - logger.info("Send {} message, keyCount is {}", SENT_SUCC_COUNTER.get(), keyCount); - } + SENT_SUCC_COUNTER.incrementAndGet(); } else { - if (result.getErrCode() != TErrCodeConstants.SERVER_RECEIVE_OVERFLOW) { - logger.error("Send message failed!" + result.getErrMsg()); - } + SENT_FAIL_COUNTER.incrementAndGet(); } } @Override public void onException(Throwable e) { + TOTAL_COUNTER.incrementAndGet(); + SENT_EXCEPT_COUNTER.incrementAndGet(); logger.error("Send message error!", e); } } diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/MessageConsumerExample.java b/tubemq-example/src/main/java/org/apache/tubemq/example/MessageConsumerExample.java index 0252d16..866be77 100644 --- a/tubemq-example/src/main/java/org/apache/tubemq/example/MessageConsumerExample.java +++ b/tubemq-example/src/main/java/org/apache/tubemq/example/MessageConsumerExample.java @@ -17,8 +17,6 @@ package org.apache.tubemq.example; -import java.util.Arrays; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.TreeSet; @@ -35,6 +33,7 @@ import org.apache.tubemq.client.exception.TubeClientException; import org.apache.tubemq.client.factory.MessageSessionFactory; import org.apache.tubemq.client.factory.TubeSingleSessionFactory; import org.apache.tubemq.corebase.Message; +import org.apache.tubemq.corebase.utils.MixedUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,59 +54,48 @@ import org.slf4j.LoggerFactory; */ public final class MessageConsumerExample { - private static final Logger logger = LoggerFactory.getLogger(MessageConsumerExample.class); + private static final Logger logger = + LoggerFactory.getLogger(MessageConsumerExample.class); private static final MsgRecvStats msgRecvStats = new MsgRecvStats(); private final PushMessageConsumer messageConsumer; private final MessageSessionFactory messageSessionFactory; - public MessageConsumerExample(String masterHostAndPort, String group, int fetchCount) throws Exception { + public MessageConsumerExample(String masterHostAndPort, + String group, int fetchThreadCnt) throws Exception { ConsumerConfig consumerConfig = new ConsumerConfig(masterHostAndPort, group); consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET); - if (fetchCount > 0) { - consumerConfig.setPushFetchThreadCnt(fetchCount); + if (fetchThreadCnt > 0) { + consumerConfig.setPushFetchThreadCnt(fetchThreadCnt); } this.messageSessionFactory = new TubeSingleSessionFactory(consumerConfig); this.messageConsumer = messageSessionFactory.createPushConsumer(consumerConfig); } + public static void main(String[] args) { - final String masterHostAndPort = args[0]; + final String masterServers = args[0]; final String topics = args[1]; final String group = args[2]; - final int consumerCount = Integer.parseInt(args[3]); - int fetchCount = -1; + final int clientCount = Integer.parseInt(args[3]); + int threadCnt = -1; if (args.length > 5) { - fetchCount = Integer.parseInt(args[4]); - } - final Map<String, TreeSet<String>> topicTidsMap = new HashMap<>(); - - String[] topicTidsList = topics.split(","); - for (String topicTids : topicTidsList) { - String[] topicTidStr = topicTids.split(":"); - TreeSet<String> tids = null; - if (topicTidStr.length > 1) { - String tidsStr = topicTidStr[1]; - String[] tidsSet = tidsStr.split(";"); - if (tidsSet.length > 0) { - tids = new TreeSet<>(Arrays.asList(tidsSet)); - } - } - topicTidsMap.put(topicTidStr[0], tids); + threadCnt = Integer.parseInt(args[4]); } - final int startFetchCount = fetchCount; - final ExecutorService executorService = Executors.newCachedThreadPool(); - for (int i = 0; i < consumerCount; i++) { + final int fetchThreadCnt = threadCnt; + final Map<String, TreeSet<String>> topicAndFiltersMap = + MixedUtils.parseTopicParam(topics); + final ExecutorService executorService = + Executors.newCachedThreadPool(); + for (int i = 0; i < clientCount; i++) { executorService.submit(new Runnable() { @Override public void run() { try { - MessageConsumerExample messageConsumer = new MessageConsumerExample( - masterHostAndPort, - group, - startFetchCount - ); - messageConsumer.subscribe(topicTidsMap); + MessageConsumerExample messageConsumer = + new MessageConsumerExample(masterServers, + group, fetchThreadCnt); + messageConsumer.subscribe(topicAndFiltersMap); } catch (Exception e) { logger.error("Create consumer failed!", e); } @@ -126,8 +114,8 @@ public final class MessageConsumerExample { msgRecvStats.stopStats(); } - public void subscribe(Map<String, TreeSet<String>> topicTidsMap) throws TubeClientException { - for (Map.Entry<String, TreeSet<String>> entry : topicTidsMap.entrySet()) { + public void subscribe(Map<String, TreeSet<String>> topicAndFiltersMap) throws TubeClientException { + for (Map.Entry<String, TreeSet<String>> entry : topicAndFiltersMap.entrySet()) { MessageV2Listener messageListener = new DefaultMessageListener(entry.getKey()); messageConsumer.subscribe(entry.getKey(), entry.getValue(), messageListener); } diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/MessageProducerExample.java b/tubemq-example/src/main/java/org/apache/tubemq/example/MessageProducerExample.java index 57d0ef0..7c29a63 100644 --- a/tubemq-example/src/main/java/org/apache/tubemq/example/MessageProducerExample.java +++ b/tubemq-example/src/main/java/org/apache/tubemq/example/MessageProducerExample.java @@ -19,7 +19,7 @@ package org.apache.tubemq.example; import java.nio.ByteBuffer; import java.text.SimpleDateFormat; -import java.util.Arrays; +import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Map; @@ -36,7 +36,9 @@ import org.apache.tubemq.client.producer.MessageProducer; import org.apache.tubemq.client.producer.MessageSentCallback; import org.apache.tubemq.client.producer.MessageSentResult; import org.apache.tubemq.corebase.Message; +import org.apache.tubemq.corebase.utils.MixedUtils; import org.apache.tubemq.corebase.utils.ThreadUtils; +import org.apache.tubemq.corebase.utils.Tuple2; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,32 +50,40 @@ import org.slf4j.LoggerFactory; */ public final class MessageProducerExample { - private static final Logger logger = LoggerFactory.getLogger(MessageProducerExample.class); - private static final ConcurrentHashMap<String, AtomicLong> counterMap = new ConcurrentHashMap<>(); + private static final Logger logger = + LoggerFactory.getLogger(MessageProducerExample.class); + private static final ConcurrentHashMap<String, AtomicLong> counterMap = + new ConcurrentHashMap<>(); - private final String[] arrayKey = {"aaa", "bbb", "ccc", "ddd", "eee", "fff", "ggg", "hhh"}; - private final Set<String> filters = new TreeSet<>(); private final MessageProducer messageProducer; private final MessageSessionFactory messageSessionFactory; - private int keyCount = 0; - private int sentCount = 0; - - public MessageProducerExample(String masterHostAndPort) throws Exception { - filters.add("aaa"); - filters.add("bbb"); - - TubeClientConfig clientConfig = new TubeClientConfig(masterHostAndPort); + public MessageProducerExample(String masterServers) throws Exception { + TubeClientConfig clientConfig = new TubeClientConfig(masterServers); this.messageSessionFactory = new TubeSingleSessionFactory(clientConfig); this.messageProducer = messageSessionFactory.createProducer(); } public static void main(String[] args) { - final String masterHostAndPort = args[0]; + // get and initial parameters + final String masterServers = args[0]; final String topics = args[1]; - final List<String> topicList = Arrays.asList(topics.split(",")); - final int count = Integer.parseInt(args[2]); - + final long msgCount = Long.parseLong(args[2]); + final Map<String, TreeSet<String>> topicAndFiltersMap = + MixedUtils.parseTopicParam(topics); + // initial send target + final List<Tuple2<String, String>> topicSendRounds = new ArrayList<>(); + // initial topic send round + for (Map.Entry<String, TreeSet<String>> entry: topicAndFiltersMap.entrySet()) { + if (entry.getValue().isEmpty()) { + topicSendRounds.add(new Tuple2<String, String>(entry.getKey())); + } else { + for (String filter : entry.getValue()) { + topicSendRounds.add(new Tuple2<String, String>(entry.getKey(), filter)); + } + } + } + // initial sent data String body = "This is a test message from single-session-factory."; byte[] bodyBytes = StringUtils.getBytesUtf8(body); final ByteBuffer dataBuffer = ByteBuffer.allocate(1024); @@ -82,32 +92,40 @@ public final class MessageProducerExample { dataBuffer.put(bodyBytes, offset, Math.min(dataBuffer.remaining(), bodyBytes.length)); } dataBuffer.flip(); + // send messages try { - MessageProducerExample messageProducer = new MessageProducerExample(masterHostAndPort); - messageProducer.publishTopics(topicList); - for (int i = 0; i < count; i++) { - for (String topic : topicList) { - try { - // next line sends message synchronously, which is not recommended - // messageProducer.sendMessage(topic, body.getBytes()); - - // send message asynchronously, recommended - messageProducer.sendMessageAsync( - i, - topic, - dataBuffer.array(), - messageProducer.new DefaultSendCallback() - ); - } catch (Throwable e1) { - logger.error("Send Message throw exception ", e1); - } + long sentCount = 0; + int roundIndex = 0; + int targetCnt = topicSendRounds.size(); + MessageProducerExample messageProducer = + new MessageProducerExample(masterServers); + messageProducer.publishTopics(topicAndFiltersMap.keySet()); + SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmm"); + while (msgCount < 0 || sentCount < msgCount) { + roundIndex = (int) (sentCount++ % targetCnt); + Tuple2<String, String> target = topicSendRounds.get(roundIndex); + Message message = new Message(target.f0, body.getBytes()); + long currTimeMillis = System.currentTimeMillis(); + message.setAttrKeyVal("index", String.valueOf(sentCount)); + message.setAttrKeyVal("dataTime", String.valueOf(currTimeMillis)); + if (target.f1 != null) { + message.putSystemHeader(target.f1, sdf.format(new Date(currTimeMillis))); } - - if (i % 20000 == 0) { + try { + // 1.1 next line sends message synchronously, which is not recommended + // messageProducer.sendMessage(message); + // 1.2 send message asynchronously, recommended + messageProducer.sendMessageAsync(message, + messageProducer.new DefaultSendCallback()); + } catch (Throwable e1) { + logger.error("Send Message throw exception ", e1); + } + // only for test, delay inflight message's count + if (sentCount % 20000 == 0) { ThreadUtils.sleep(4000); - } else if (i % 10000 == 0) { + } else if (sentCount % 10000 == 0) { ThreadUtils.sleep(2000); - } else if (i % 2500 == 0) { + } else if (sentCount % 2500 == 0) { ThreadUtils.sleep(300); } } @@ -124,51 +142,33 @@ public final class MessageProducerExample { } } - public void publishTopics(List<String> topicList) throws TubeClientException { - this.messageProducer.publish(new TreeSet<>(topicList)); + public void publishTopics(Set<String> topicSet) throws TubeClientException { + this.messageProducer.publish(topicSet); } /** * Send message synchronous. */ - public void sendMessage(String topic, byte[] body) { + public void sendMessage(Message message) { // date format is accurate to minute, not to second - SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmm"); - long currTimeMillis = System.currentTimeMillis(); - Message message = new Message(topic, body); - message.setAttrKeyVal("index", String.valueOf(1)); - message.setAttrKeyVal("dataTime", String.valueOf(currTimeMillis)); - message.putSystemHeader("test", sdf.format(new Date(currTimeMillis))); try { MessageSentResult result = messageProducer.sendMessage(message); if (!result.isSuccess()) { - logger.error("Send message failed!" + result.getErrMsg()); + logger.error("Sync-send message failed!" + result.getErrMsg()); } } catch (TubeClientException | InterruptedException e) { - logger.error("Send message failed!", e); + logger.error("Sync-send message failed!", e); } } /** * Send message asynchronous. More efficient and recommended. */ - public void sendMessageAsync(int id, String topic, byte[] body, MessageSentCallback callback) { - Message message = new Message(topic, body); - - // date format is accurate to minute, not to second - SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmm"); - long currTimeMillis = System.currentTimeMillis(); - message.setAttrKeyVal("index", String.valueOf(1)); - String keyCode = arrayKey[sentCount++ % arrayKey.length]; - message.putSystemHeader(keyCode, sdf.format(new Date(currTimeMillis))); - if (filters.contains(keyCode)) { - keyCount++; - } + public void sendMessageAsync(Message message, MessageSentCallback callback) { try { - message.setAttrKeyVal("dataTime", String.valueOf(currTimeMillis)); messageProducer.sendMessage(message, callback); } catch (TubeClientException | InterruptedException e) { - logger.error("Send message failed!", e); + logger.error("Async-send message failed!", e); } } @@ -187,9 +187,8 @@ public final class MessageProducerExample { currCount = tmpCount; } } - if (currCount.incrementAndGet() % 1000 == 0) { - logger.info("Send " + topicName + " " + currCount.get() + " message, keyCount is " + keyCount); + logger.info("Send " + topicName + " " + currCount.get() + " message!"); } } else { logger.error("Send message failed!" + result.getErrMsg()); diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/MessagePullConsumerExample.java b/tubemq-example/src/main/java/org/apache/tubemq/example/MessagePullConsumerExample.java index 16c45b2..36fcaa2 100644 --- a/tubemq-example/src/main/java/org/apache/tubemq/example/MessagePullConsumerExample.java +++ b/tubemq-example/src/main/java/org/apache/tubemq/example/MessagePullConsumerExample.java @@ -17,9 +17,10 @@ package org.apache.tubemq.example; -import java.util.Arrays; +import static org.apache.tubemq.corebase.TErrCodeConstants.IGNORE_ERROR_SET; import java.util.List; import java.util.Map; +import java.util.TreeSet; import org.apache.tubemq.client.config.ConsumerConfig; import org.apache.tubemq.client.consumer.ConsumeOffsetInfo; import org.apache.tubemq.client.consumer.ConsumePosition; @@ -29,9 +30,11 @@ import org.apache.tubemq.client.exception.TubeClientException; import org.apache.tubemq.client.factory.MessageSessionFactory; import org.apache.tubemq.client.factory.TubeSingleSessionFactory; import org.apache.tubemq.corebase.Message; +import org.apache.tubemq.corebase.utils.MixedUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + /** * This demo shows how to consume message by pull. * @@ -42,7 +45,8 @@ import org.slf4j.LoggerFactory; */ public final class MessagePullConsumerExample { - private static final Logger logger = LoggerFactory.getLogger(MessagePullConsumerExample.class); + private static final Logger logger = + LoggerFactory.getLogger(MessagePullConsumerExample.class); private static final MsgRecvStats msgRecvStats = new MsgRecvStats(); private final PullMessageConsumer messagePullConsumer; @@ -56,39 +60,37 @@ public final class MessagePullConsumerExample { } public static void main(String[] args) throws Throwable { - final String masterHostAndPort = args[0]; + // get and initial parameters + final String masterServers = args[0]; final String topics = args[1]; final String group = args[2]; - final int consumeCount = Integer.parseInt(args[3]); - - final MessagePullConsumerExample messageConsumer = new MessagePullConsumerExample( - masterHostAndPort, - group - ); - - final List<String> topicList = Arrays.asList(topics.split(",")); - messageConsumer.subscribe(topicList); - long startTime = System.currentTimeMillis(); - + final int msgCount = Integer.parseInt(args[3]); + final Map<String, TreeSet<String>> topicAndFiltersMap = + MixedUtils.parseTopicParam(topics); + // initial consumer object + final MessagePullConsumerExample messageConsumer = + new MessagePullConsumerExample(masterServers, group); + messageConsumer.subscribe(topicAndFiltersMap); Thread[] fetchRunners = new Thread[3]; for (int i = 0; i < fetchRunners.length; i++) { - fetchRunners[i] = new Thread(new FetchRequestRunner(messageConsumer, consumeCount)); + fetchRunners[i] = new Thread(new FetchRequestRunner(messageConsumer, msgCount)); fetchRunners[i].setName("_fetch_runner_" + i); } - + // initial fetch threads for (Thread thread : fetchRunners) { thread.start(); } - - Thread statisticThread = new Thread(msgRecvStats, "Sent Statistic Thread"); + // initial statistic thread + Thread statisticThread = + new Thread(msgRecvStats, "Sent Statistic Thread"); statisticThread.start(); } - public void subscribe(List<String> topicList) throws TubeClientException { - for (String topic : topicList) { - messagePullConsumer.subscribe(topic, null); + public void subscribe( + Map<String, TreeSet<String>> topicAndFiltersMap) throws TubeClientException { + for (Map.Entry<String, TreeSet<String>> entry : topicAndFiltersMap.entrySet()) { + messagePullConsumer.subscribe(entry.getKey(), entry.getValue()); } - messagePullConsumer.completeSubscribe(); } @@ -96,7 +98,8 @@ public final class MessagePullConsumerExample { return messagePullConsumer.getMessage(); } - public ConsumerResult confirmConsume(final String confirmContext, boolean isConsumed) throws TubeClientException { + public ConsumerResult confirmConsume(final String confirmContext, + boolean isConsumed) throws TubeClientException { return messagePullConsumer.confirmConsume(confirmContext, isConsumed); } @@ -109,9 +112,9 @@ public final class MessagePullConsumerExample { final MessagePullConsumerExample messageConsumer; final int consumeCount; - FetchRequestRunner(final MessagePullConsumerExample messageConsumer, int count) { + FetchRequestRunner(final MessagePullConsumerExample messageConsumer, int msgCount) { this.messageConsumer = messageConsumer; - this.consumeCount = count; + this.consumeCount = msgCount; } @Override @@ -127,16 +130,10 @@ public final class MessagePullConsumerExample { } messageConsumer.confirmConsume(result.getConfirmContext(), true); } else { - if (!(result.getErrCode() == 400 - || result.getErrCode() == 404 - || result.getErrCode() == 405 - || result.getErrCode() == 406 - || result.getErrCode() == 407 - || result.getErrCode() == 408)) { + if (!IGNORE_ERROR_SET.contains(result.getErrCode())) { logger.info( "Receive messages errorCode is {}, Error message is {}", - result.getErrCode(), - result.getErrMsg()); + result.getErrCode(), result.getErrMsg()); } } if (consumeCount > 0) { diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/MessagePullSetConsumerExample.java b/tubemq-example/src/main/java/org/apache/tubemq/example/MessagePullSetConsumerExample.java index d3afeaa..3d49478 100644 --- a/tubemq-example/src/main/java/org/apache/tubemq/example/MessagePullSetConsumerExample.java +++ b/tubemq-example/src/main/java/org/apache/tubemq/example/MessagePullSetConsumerExample.java @@ -17,7 +17,7 @@ package org.apache.tubemq.example; -import java.util.Arrays; +import static org.apache.tubemq.corebase.TErrCodeConstants.IGNORE_ERROR_SET; import java.util.List; import java.util.Map; import java.util.TreeSet; @@ -34,9 +34,11 @@ import org.apache.tubemq.client.exception.TubeClientException; import org.apache.tubemq.client.factory.MessageSessionFactory; import org.apache.tubemq.client.factory.TubeSingleSessionFactory; import org.apache.tubemq.corebase.Message; +import org.apache.tubemq.corebase.utils.MixedUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + /** * This demo shows how to reset offset on consuming. The main difference from {@link MessagePullConsumerExample} * is that we call {@link PullMessageConsumer#completeSubscribe(String, int, boolean, Map)} instead of @@ -45,24 +47,32 @@ import org.slf4j.LoggerFactory; */ public final class MessagePullSetConsumerExample { - private static final Logger logger = LoggerFactory.getLogger(MessagePullSetConsumerExample.class); + private static final Logger logger = + LoggerFactory.getLogger(MessagePullSetConsumerExample.class); private static final AtomicLong counter = new AtomicLong(0); private final PullMessageConsumer messagePullConsumer; private final MessageSessionFactory messageSessionFactory; - public MessagePullSetConsumerExample(String masterHostAndPort, String group) throws Exception { + public MessagePullSetConsumerExample(String masterHostAndPort, + String group) throws Exception { ConsumerConfig consumerConfig = new ConsumerConfig(masterHostAndPort, group); this.messageSessionFactory = new TubeSingleSessionFactory(consumerConfig); this.messagePullConsumer = messageSessionFactory.createPullConsumer(consumerConfig); } public static void main(String[] args) { - final String masterHostAndPort = args[0]; + // get and initial parameters + final String masterServers = args[0]; final String topics = args[1]; final String group = args[2]; - final int consumeCount = Integer.parseInt(args[3]); - final Map<String, Long> partOffsetMap = new ConcurrentHashMap<>(); + final int msgCount = Integer.parseInt(args[3]); + final Map<String, TreeSet<String>> topicAndFiltersMap = + MixedUtils.parseTopicParam(topics); + // initial reset offset parameters + // (The offset specified is only a demo) + final Map<String, Long> partOffsetMap = + new ConcurrentHashMap<>(); partOffsetMap.put("123:test_1:0", 0L); partOffsetMap.put("123:test_1:1", 0L); partOffsetMap.put("123:test_1:2", 0L); @@ -70,17 +80,15 @@ public final class MessagePullSetConsumerExample { partOffsetMap.put("123:test_2:1", 350L); partOffsetMap.put("123:test_2:2", 350L); - final List<String> topicList = Arrays.asList(topics.split(",")); - ExecutorService executorService = Executors.newCachedThreadPool(); executorService.submit(new Runnable() { @Override public void run() { try { - int getCount = consumeCount; + int getCount = msgCount; MessagePullSetConsumerExample messageConsumer = - new MessagePullSetConsumerExample(masterHostAndPort, group); - messageConsumer.subscribe(topicList, partOffsetMap); + new MessagePullSetConsumerExample(masterServers, group); + messageConsumer.subscribe(topicAndFiltersMap, partOffsetMap); // main logic of consuming do { ConsumerResult result = messageConsumer.getMessage(); @@ -125,19 +133,13 @@ public final class MessagePullSetConsumerExample { confirmResult.getErrMsg()); } } else { - if (!(result.getErrCode() == 400 - || result.getErrCode() == 404 - || result.getErrCode() == 405 - || result.getErrCode() == 406 - || result.getErrCode() == 407 - || result.getErrCode() == 408)) { + if (!IGNORE_ERROR_SET.contains(result.getErrCode())) { logger.info( "Receive messages errorCode is {}, Error message is {}", - result.getErrCode(), - result.getErrMsg()); + result.getErrCode(), result.getErrMsg()); } } - if (consumeCount >= 0) { + if (msgCount >= 0) { if (--getCount <= 0) { break; } @@ -157,24 +159,16 @@ public final class MessagePullSetConsumerExample { } } - public void subscribe( - List<String> topicList, - Map<String, Long> partOffsetMap - ) throws TubeClientException { - TreeSet<String> filters = new TreeSet<>(); - filters.add("aaa"); - filters.add("bbb"); - for (String topic : topicList) { - this.messagePullConsumer.subscribe(topic, filters); + public void subscribe(Map<String, TreeSet<String>> topicAndFiltersMap, + Map<String, Long> partOffsetMap) throws TubeClientException { + for (Map.Entry<String, TreeSet<String>> entry : topicAndFiltersMap.entrySet()) { + messagePullConsumer.subscribe(entry.getKey(), entry.getValue()); } String sessionKey = "test_reset2"; int consumerCount = 2; boolean isSelectBig = false; - messagePullConsumer.completeSubscribe( - sessionKey, - consumerCount, - isSelectBig, - partOffsetMap); + messagePullConsumer.completeSubscribe(sessionKey, + consumerCount, isSelectBig, partOffsetMap); } public ConsumerResult getMessage() throws TubeClientException { diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliProducer.java b/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliProducer.java index 1b2f25a..f544829 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliProducer.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliProducer.java @@ -42,6 +42,7 @@ import org.apache.tubemq.corebase.TBaseConstants; import org.apache.tubemq.corebase.utils.MixedUtils; import org.apache.tubemq.corebase.utils.TStringUtils; import org.apache.tubemq.corebase.utils.ThreadUtils; +import org.apache.tubemq.corebase.utils.Tuple2; import org.apache.tubemq.server.common.fielddef.CliArgDef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,7 +65,7 @@ public class CliProducer extends CliAbstractBase { // sent data content private static byte[] sentData; private final Map<String, TreeSet<String>> topicAndFiltersMap = new HashMap<>(); - private final List<TupleValue> topicSendRounds = new ArrayList<>(); + private final List<Tuple2<String, String>> topicSendRounds = new ArrayList<>(); private final List<MessageSessionFactory> sessionFactoryList = new ArrayList<>(); private final Map<MessageProducer, MsgSender> producerMap = new HashMap<>(); // cli parameters @@ -189,10 +190,10 @@ public class CliProducer extends CliAbstractBase { // initial topic send round for (Map.Entry<String, TreeSet<String>> entry: topicAndFiltersMap.entrySet()) { if (entry.getValue().isEmpty()) { - topicSendRounds.add(new TupleValue(entry.getKey())); + topicSendRounds.add(new Tuple2<String, String>(entry.getKey())); } else { for (String filter : entry.getValue()) { - topicSendRounds.add(new TupleValue(entry.getKey(), filter)); + topicSendRounds.add(new Tuple2<String, String>(entry.getKey(), filter)); } } } @@ -266,11 +267,11 @@ public class CliProducer extends CliAbstractBase { roundIndex = (int) (sentCount++ % topicAndCondCnt); try { long millis = System.currentTimeMillis(); - TupleValue tupleValue = topicSendRounds.get(roundIndex); - Message message = new Message(tupleValue.topic, sentData); - if (tupleValue.filter != null) { + Tuple2<String, String> target = topicSendRounds.get(roundIndex); + Message message = new Message(target.f0, sentData); + if (target.f1 != null) { // if include filter, add filter item - message.putSystemHeader(tupleValue.filter, sdf.format(new Date(millis))); + message.putSystemHeader(target.f1, sdf.format(new Date(millis))); } // use sync or async process if (syncProduction) { @@ -332,21 +333,6 @@ public class CliProducer extends CliAbstractBase { } } - private static class TupleValue { - public String topic = null; - public String filter = null; - - public TupleValue(String topic) { - this.topic = topic; - } - - public TupleValue(String topic, String filter) { - this.topic = topic; - this.filter = filter; - } - - } - public static void main(String[] args) { CliProducer cliProducer = new CliProducer(); try {
