http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/message/MessageQueueMsg.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/rocketmq/test/message/MessageQueueMsg.java b/test/src/main/java/org/apache/rocketmq/test/message/MessageQueueMsg.java new file mode 100644 index 0000000..38c2b6b --- /dev/null +++ b/test/src/main/java/org/apache/rocketmq/test/message/MessageQueueMsg.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.message; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.test.factory.MQMessageFactory; + +public class MessageQueueMsg { + private Map<MessageQueue, List<Object>> msgsWithMQ = null; + private Map<Integer, List<Object>> msgsWithMQId = null; + private Collection<Object> msgBodys = null; + + public MessageQueueMsg(List<MessageQueue> mqs, int msgSize) { + this(mqs, msgSize, null); + } + + public MessageQueueMsg(List<MessageQueue> mqs, int msgSize, String tag) { + msgsWithMQ = MQMessageFactory.getMsgByMQ(mqs, msgSize, tag); + msgsWithMQId = new HashMap<Integer, List<Object>>(); + msgBodys = new ArrayList<Object>(); + init(); + } + + public Map<MessageQueue, List<Object>> getMsgsWithMQ() { + return msgsWithMQ; + } + + public Map<Integer, List<Object>> getMsgWithMQId() { + return msgsWithMQId; + } + + public Collection<Object> getMsgBodys() { + return msgBodys; + } + + private void init() { + for (MessageQueue mq : msgsWithMQ.keySet()) { + msgsWithMQId.put(mq.getQueueId(), msgsWithMQ.get(mq)); + msgBodys.addAll(MQMessageFactory.getMessageBody(msgsWithMQ.get(mq))); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/sendresult/SendResult.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/rocketmq/test/sendresult/SendResult.java b/test/src/main/java/org/apache/rocketmq/test/sendresult/SendResult.java new file mode 100644 index 0000000..d53ee7d --- /dev/null +++ b/test/src/main/java/org/apache/rocketmq/test/sendresult/SendResult.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.sendresult; + +public class SendResult { + private boolean sendResult = false; + private String msgId = null; + private Exception sendException = null; + private String brokerIp = null; + + public String getBrokerIp() { + return brokerIp; + } + + public void setBrokerIp(String brokerIp) { + this.brokerIp = brokerIp; + } + + public boolean isSendResult() { + return sendResult; + } + + public void setSendResult(boolean sendResult) { + this.sendResult = sendResult; + } + + public String getMsgId() { + return msgId; + } + + public void setMsgId(String msgId) { + this.msgId = msgId; + } + + public Exception getSendException() { + return sendException; + } + + public void setSendException(Exception sendException) { + this.sendException = sendException; + } + + @Override + public String toString() { + return String.format("sendstatus:%s msgId:%s", sendResult, msgId); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/util/Condition.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/rocketmq/test/util/Condition.java b/test/src/main/java/org/apache/rocketmq/test/util/Condition.java new file mode 100644 index 0000000..3c5f403 --- /dev/null +++ b/test/src/main/java/org/apache/rocketmq/test/util/Condition.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.util; + +public interface Condition { + boolean meetCondition(); +} + http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/util/DuplicateMessageInfo.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/rocketmq/test/util/DuplicateMessageInfo.java b/test/src/main/java/org/apache/rocketmq/test/util/DuplicateMessageInfo.java new file mode 100644 index 0000000..8bd93b6 --- /dev/null +++ b/test/src/main/java/org/apache/rocketmq/test/util/DuplicateMessageInfo.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.util; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.text.DecimalFormat; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +public class DuplicateMessageInfo<T> { + + public void checkDuplicatedMessageInfo(boolean bPrintLog, + List<List<T>> lQueueList) throws IOException { + int msgListSize = lQueueList.size(); + int maxmsgList = 0; + Map<T, Integer> msgIdMap = new HashMap<T, Integer>(); + Map<Integer, Integer> dupMsgMap = new HashMap<Integer, Integer>(); + + for (int i = 0; i < msgListSize; i++) { + if (maxmsgList < lQueueList.get(i).size()) + maxmsgList = lQueueList.get(i).size(); + } + + List<StringBuilder> strBQueue = new LinkedList<StringBuilder>(); + for (int i = 0; i < msgListSize; i++) + strBQueue.add(new StringBuilder()); + + for (int msgListIndex = 0; msgListIndex < maxmsgList; msgListIndex++) { + for (int msgQueueListIndex = 0; msgQueueListIndex < msgListSize; msgQueueListIndex++) { + if (msgListIndex < lQueueList.get(msgQueueListIndex).size()) { + if (msgIdMap.containsKey(lQueueList.get(msgQueueListIndex).get(msgListIndex))) { + if (dupMsgMap.containsKey(msgQueueListIndex)) { + int dupMsgCount = dupMsgMap.get(msgQueueListIndex); + dupMsgCount++; + dupMsgMap.remove(msgQueueListIndex); + dupMsgMap.put(msgQueueListIndex, dupMsgCount); + } else { + dupMsgMap.put(msgQueueListIndex, 1); + } + + strBQueue.get(msgQueueListIndex).append("" + msgQueueListIndex + "\t" + + msgIdMap.get(lQueueList.get(msgQueueListIndex).get(msgListIndex)) + "\t" + + lQueueList.get(msgQueueListIndex).get(msgListIndex) + "\r\n"); + } else { + msgIdMap.put(lQueueList.get(msgQueueListIndex).get(msgListIndex), msgQueueListIndex); + } + } + } + } + + int msgTotalNum = getMsgTotalNumber(lQueueList); + int msgTotalDupNum = getDuplicateMsgNum(dupMsgMap); + int msgNoDupNum = msgTotalNum - msgTotalDupNum; + float msgDupRate = ((float) msgTotalDupNum / (float) msgTotalNum) * 100.0f; + StringBuilder strBuilder = new StringBuilder(); + + strBuilder.append("msgTotalNum:" + msgTotalNum + "\r\n"); + strBuilder.append("msgTotalDupNum:" + msgTotalDupNum + "\r\n"); + strBuilder.append("msgNoDupNum:" + msgNoDupNum + "\r\n"); + strBuilder.append("msgDupRate" + getFloatNumString(msgDupRate) + "%\r\n"); + + strBuilder.append("queue\tmsg(dupNum/dupRate)\tdupRate\r\n"); + for (int i = 0; i < dupMsgMap.size(); i++) { + int msgDupNum = dupMsgMap.get(i); + int msgNum = lQueueList.get(i).size(); + float msgQueueDupRate = ((float) msgDupNum / (float) msgTotalDupNum) * 100.0f; + float msgQueueInnerDupRate = ((float) msgDupNum / (float) msgNum) * 100.0f; + + strBuilder.append(i + "\t" + msgDupNum + "/" + getFloatNumString(msgQueueDupRate) + "%" + "\t\t" + + getFloatNumString(msgQueueInnerDupRate) + "%\r\n"); + } + + System.out.print(strBuilder.toString()); + String titleString = "queue\tdupQueue\tdupMsg\r\n"; + System.out.print(titleString); + + for (int i = 0; i < msgListSize; i++) + System.out.print(strBQueue.get(i).toString()); + + if (bPrintLog) { + String logFileNameStr = "D:" + File.separator + "checkDuplicatedMessageInfo.txt"; + File logFileNameFile = new File(logFileNameStr); + OutputStream out = new FileOutputStream(logFileNameFile, true); + + String strToWrite; + byte[] byteToWrite; + strToWrite = strBuilder.toString() + titleString; + for (int i = 0; i < msgListSize; i++) + strToWrite += strBQueue.get(i).toString() + "\r\n"; + + byteToWrite = strToWrite.getBytes(); + out.write(byteToWrite); + out.close(); + } + } + + private int getMsgTotalNumber(List<List<T>> lQueueList) { + int msgTotalNum = 0; + for (int i = 0; i < lQueueList.size(); i++) { + msgTotalNum += lQueueList.get(i).size(); + } + return msgTotalNum; + } + + private int getDuplicateMsgNum(Map<Integer, Integer> msgDupMap) { + int msgDupNum = 0; + for (int i = 0; i < msgDupMap.size(); i++) { + msgDupNum += msgDupMap.get(i); + } + return msgDupNum; + } + + private String getFloatNumString(float fNum) { + DecimalFormat dcmFmt = new DecimalFormat("0.00"); + return dcmFmt.format(fNum); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/util/FileUtil.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/rocketmq/test/util/FileUtil.java b/test/src/main/java/org/apache/rocketmq/test/util/FileUtil.java new file mode 100644 index 0000000..44db782 --- /dev/null +++ b/test/src/main/java/org/apache/rocketmq/test/util/FileUtil.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.util; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.Properties; + +public class FileUtil { + private static String lineSeperator = System.getProperty("line.separator"); + + private String filePath = ""; + private String fileName = ""; + + public FileUtil(String filePath, String fileName) { + this.filePath = filePath; + this.fileName = fileName; + } + + public static void main(String args[]) { + String filePath = FileUtil.class.getResource("/").getPath(); + String fileName = "test.txt"; + FileUtil fileUtil = new FileUtil(filePath, fileName); + Properties properties = new Properties(); + properties.put("xx", "yy"); + properties.put("yy", "xx"); + fileUtil.writeProperties(properties); + } + + public void deleteFile() { + File file = new File(filePath + File.separator + fileName); + if (file.exists()) { + file.delete(); + } + } + + public void appendFile(String content) { + File file = openFile(); + String newContent = lineSeperator + content; + writeFile(file, newContent, true); + } + + public void coverFile(String content) { + File file = openFile(); + writeFile(file, content, false); + } + + public void writeProperties(Properties properties) { + String content = getPropertiesAsString(properties); + this.coverFile(content); + } + + private String getPropertiesAsString(Properties properties) { + StringBuilder sb = new StringBuilder(); + for (Object key : properties.keySet()) { + sb.append(key).append("=").append(properties.getProperty((String) key)) + .append(lineSeperator); + } + return sb.toString(); + } + + private void writeFile(File file, String content, boolean append) { + FileWriter writer = null; + try { + writer = new FileWriter(file.getAbsoluteFile(), append); + writer.write(content); + writer.flush(); + } catch (IOException e) { + e.printStackTrace(); + } finally { + if (writer != null) { + try { + writer.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + } + + private File openFile() { + File file = new File(filePath + File.separator + fileName); + if (!file.exists()) { + try { + file.createNewFile(); + } catch (IOException e) { + e.printStackTrace(); + } + } + return file; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/util/MQAdmin.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/rocketmq/test/util/MQAdmin.java b/test/src/main/java/org/apache/rocketmq/test/util/MQAdmin.java new file mode 100644 index 0000000..c3e0572 --- /dev/null +++ b/test/src/main/java/org/apache/rocketmq/test/util/MQAdmin.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.util; + +import java.util.HashMap; +import java.util.Set; +import org.apache.log4j.Logger; +import org.apache.rocketmq.common.admin.TopicStatsTable; +import org.apache.rocketmq.common.protocol.body.ClusterInfo; +import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.command.CommandUtil; + +public class MQAdmin { + private static Logger log = Logger.getLogger(MQAdmin.class); + + public static boolean createTopic(String nameSrvAddr, String clusterName, String topic, + int queueNum) { + int defaultWaitTime = 5; + return createTopic(nameSrvAddr, clusterName, topic, queueNum, defaultWaitTime); + } + + public static boolean createTopic(String nameSrvAddr, String clusterName, String topic, + int queueNum, int waitTimeSec) { + boolean createResult = false; + DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt(); + mqAdminExt.setNamesrvAddr(nameSrvAddr); + try { + mqAdminExt.start(); + mqAdminExt.createTopic(clusterName, topic, queueNum); + } catch (Exception e) { + e.printStackTrace(); + } + + long startTime = System.currentTimeMillis(); + while (!createResult) { + createResult = checkTopicExist(mqAdminExt, topic); + if (System.currentTimeMillis() - startTime < waitTimeSec * 1000) { + TestUtils.waitForMonment(100); + } else { + log.error(String.format("timeout,but create topic[%s] failed!", topic)); + break; + } + } + + mqAdminExt.shutdown(); + return createResult; + } + + private static boolean checkTopicExist(DefaultMQAdminExt mqAdminExt, String topic) { + boolean createResult = false; + try { + TopicStatsTable topicInfo = mqAdminExt.examineTopicStats(topic); + createResult = !topicInfo.getOffsetTable().isEmpty(); + } catch (Exception e) { + } + + return createResult; + } + + public static boolean createSub(String nameSrvAddr, String clusterName, String consumerId) { + boolean createResult = true; + DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt(); + mqAdminExt.setNamesrvAddr(nameSrvAddr); + SubscriptionGroupConfig config = new SubscriptionGroupConfig(); + config.setGroupName(consumerId); + try { + mqAdminExt.start(); + Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(mqAdminExt, + clusterName); + for (String addr : masterSet) { + try { + mqAdminExt.createAndUpdateSubscriptionGroupConfig(addr, config); + log.info(String.format("create subscription group %s to %s success.\n", consumerId, + addr)); + } catch (Exception e) { + e.printStackTrace(); + Thread.sleep(1000 * 1); + } + } + } catch (Exception e) { + createResult = false; + e.printStackTrace(); + } + mqAdminExt.shutdown(); + return createResult; + } + + public static ClusterInfo getCluster(String nameSrvAddr) { + DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt(); + mqAdminExt.setNamesrvAddr(nameSrvAddr); + ClusterInfo clusterInfo = null; + try { + mqAdminExt.start(); + clusterInfo = mqAdminExt.examineBrokerClusterInfo(); + } catch (Exception e) { + e.printStackTrace(); + } + mqAdminExt.shutdown(); + return clusterInfo; + } + + public static boolean isBrokerExist(String ns, String ip) { + ClusterInfo clusterInfo = getCluster(ns); + if (clusterInfo == null) { + return false; + } else { + HashMap<String, BrokerData> brokers = clusterInfo.getBrokerAddrTable(); + for (String brokerName : brokers.keySet()) { + HashMap<Long, String> brokerIps = brokers.get(brokerName).getBrokerAddrs(); + for (long brokerId : brokerIps.keySet()) { + if (brokerIps.get(brokerId).contains(ip)) + return true; + } + } + } + + return false; + } + + public void getSubConnection(String nameSrvAddr, String clusterName, String consumerId) { + boolean createResult = true; + DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt(); + mqAdminExt.setNamesrvAddr(nameSrvAddr); + SubscriptionGroupConfig config = new SubscriptionGroupConfig(); + config.setGroupName(consumerId); + try { + mqAdminExt.start(); + Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(mqAdminExt, + clusterName); + for (String addr : masterSet) { + try { + + System.out.printf("create subscription group %s to %s success.\n", consumerId, + addr); + } catch (Exception e) { + e.printStackTrace(); + Thread.sleep(1000 * 1); + } + } + } catch (Exception e) { + createResult = false; + e.printStackTrace(); + } + mqAdminExt.shutdown(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/util/MQRandomUtils.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/rocketmq/test/util/MQRandomUtils.java b/test/src/main/java/org/apache/rocketmq/test/util/MQRandomUtils.java new file mode 100644 index 0000000..1d82445 --- /dev/null +++ b/test/src/main/java/org/apache/rocketmq/test/util/MQRandomUtils.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.util; + +public class MQRandomUtils { + public static String getRandomTopic() { + return RandomUtils.getStringByUUID(); + } + + public static String getRandomConsumerGroup() { + return RandomUtils.getStringByUUID(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/util/MQWait.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/rocketmq/test/util/MQWait.java b/test/src/main/java/org/apache/rocketmq/test/util/MQWait.java new file mode 100644 index 0000000..6edeeca --- /dev/null +++ b/test/src/main/java/org/apache/rocketmq/test/util/MQWait.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.util; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import org.apache.log4j.Logger; +import org.apache.rocketmq.test.listener.AbstractListener; + +import static com.google.common.truth.Truth.assertThat; + +public class MQWait { + private static Logger logger = Logger.getLogger(MQWait.class); + + public static boolean waitConsumeAll(int timeoutMills, Collection<Object> allSendMsgs, + AbstractListener... listeners) { + boolean recvAll = false; + long startTime = System.currentTimeMillis(); + Collection<Object> noDupMsgs = new ArrayList<Object>(); + while (!recvAll) { + if ((System.currentTimeMillis() - startTime) < timeoutMills) { + noDupMsgs.clear(); + try { + for (AbstractListener listener : listeners) { + Collection<Object> recvMsgs = Collections + .synchronizedCollection(listener.getAllUndupMsgBody()); + noDupMsgs.addAll(VerifyUtils.getFilterdMessage(allSendMsgs, recvMsgs)); + } + } catch (Exception e) { + e.printStackTrace(); + } + + try { + assertThat(noDupMsgs).containsAllIn(allSendMsgs); + recvAll = true; + break; + } catch (Throwable e) { + } + TestUtil.waitForMonment(500); + } else { + logger.error(String.format( + "timeout but still not receive all messages,expectSize[%s],realSize[%s]", + allSendMsgs.size(), noDupMsgs.size())); + break; + } + } + + return recvAll; + } + + public static void setCondition(Condition condition, int waitTimeMills, int intervalMills) { + long startTime = System.currentTimeMillis(); + while (!condition.meetCondition()) { + if (System.currentTimeMillis() - startTime > waitTimeMills) { + logger.error("time out,but contidion still not meet!"); + break; + } else { + TestUtil.waitForMonment(intervalMills); + } + } + } + + public static void main(String args[]) { + + long start = System.currentTimeMillis(); + MQWait.setCondition(new Condition() { + int i = 0; + + public boolean meetCondition() { + i++; + return i == 100; + } + }, 10 * 1000, 200); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/util/RandomUtil.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/rocketmq/test/util/RandomUtil.java b/test/src/main/java/org/apache/rocketmq/test/util/RandomUtil.java new file mode 100644 index 0000000..1c2bdac --- /dev/null +++ b/test/src/main/java/org/apache/rocketmq/test/util/RandomUtil.java @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.util; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Random; +import java.util.Set; +import java.util.UUID; + +public final class RandomUtil { + + private static final int UNICODE_START = '\u4E00'; + private static final int UNICODE_END = '\u9FA0'; + private static Random rd = new Random(); + + private RandomUtil() { + + } + + public static long getLong() { + return rd.nextLong(); + } + + public static long getLongMoreThanZero() { + long res = rd.nextLong(); + while (res <= 0) { + res = rd.nextLong(); + } + return res; + } + + public static long getLongLessThan(long n) { + long res = rd.nextLong(); + return res % n; + } + + public static long getLongMoreThanZeroLessThan(long n) { + long res = getLongLessThan(n); + while (res <= 0) { + res = getLongLessThan(n); + } + return res; + } + + public static long getLongBetween(long n, long m) { + if (m <= n) { + return n; + } + long res = getLongMoreThanZero(); + return n + res % (m - n); + } + + public static int getInteger() { + return rd.nextInt(); + } + + public static int getIntegerMoreThanZero() { + int res = rd.nextInt(); + while (res <= 0) { + res = rd.nextInt(); + } + return res; + } + + public static int getIntegerLessThan(int n) { + int res = rd.nextInt(); + return res % n; + } + + public static int getIntegerMoreThanZeroLessThan(int n) { + int res = rd.nextInt(n); + while (res == 0) { + res = rd.nextInt(n); + } + return res; + } + + public static int getIntegerBetween(int n, int m)// m��ֵ����Ϊ���أ� + { + if (m == n) { + return n; + } + int res = getIntegerMoreThanZero(); + return n + res % (m - n); + } + + private static char getChar(int arg[]) { + int size = arg.length; + int c = rd.nextInt(size / 2); + c = c * 2; + return (char) (getIntegerBetween(arg[c], arg[c + 1])); + } + + private static String getString(int n, int arg[]) { + StringBuilder res = new StringBuilder(); + for (int i = 0; i < n; i++) { + res.append(getChar(arg)); + } + return res.toString(); + } + + public static String getStringWithCharacter(int n) { + int arg[] = new int[] {'a', 'z' + 1, 'A', 'Z' + 1}; + return getString(n, arg); + } + + public static String getStringWithNumber(int n) { + int arg[] = new int[] {'0', '9' + 1}; + return getString(n, arg); + } + + public static String getStringWithNumAndCha(int n) { + int arg[] = new int[] {'a', 'z' + 1, 'A', 'Z' + 1, '0', '9' + 1}; + return getString(n, arg); + } + + public static String getStringShortenThan(int n) { + int len = getIntegerMoreThanZeroLessThan(n); + return getStringWithCharacter(len); + } + + public static String getStringWithNumAndChaShortenThan(int n) { + int len = getIntegerMoreThanZeroLessThan(n); + return getStringWithNumAndCha(len); + } + + public static String getStringBetween(int n, int m) { + int len = getIntegerBetween(n, m); + return getStringWithCharacter(len); + } + + public static String getStringWithNumAndChaBetween(int n, int m) { + int len = getIntegerBetween(n, m); + return getStringWithNumAndCha(len); + } + + public static String getStringWithPrefix(int n, String prefix) { + int len = prefix.length(); + if (n <= len) + return prefix; + else { + len = n - len; + StringBuilder res = new StringBuilder(prefix); + res.append(getStringWithCharacter(len)); + return res.toString(); + } + } + + public static String getStringWithSuffix(int n, String suffix) { + + int len = suffix.length(); + if (n <= len) + return suffix; + else { + len = n - len; + StringBuilder res = new StringBuilder(); + res.append(getStringWithCharacter(len)); + res.append(suffix); + return res.toString(); + } + } + + public static String getStringWithBoth(int n, String prefix, String suffix) { + int len = prefix.length() + suffix.length(); + StringBuilder res = new StringBuilder(prefix); + if (n <= len) + return res.append(suffix).toString(); + else { + len = n - len; + res.append(getStringWithCharacter(len)); + res.append(suffix); + return res.toString(); + } + } + + public static String getCheseWordWithPrifix(int n, String prefix) { + int len = prefix.length(); + if (n <= len) + return prefix; + else { + len = n - len; + StringBuilder res = new StringBuilder(prefix); + res.append(getCheseWord(len)); + return res.toString(); + } + } + + public static String getCheseWordWithSuffix(int n, String suffix) { + + int len = suffix.length(); + if (n <= len) + return suffix; + else { + len = n - len; + StringBuilder res = new StringBuilder(); + res.append(getCheseWord(len)); + res.append(suffix); + return res.toString(); + } + } + + public static String getCheseWordWithBoth(int n, String prefix, String suffix) { + int len = prefix.length() + suffix.length(); + StringBuilder res = new StringBuilder(prefix); + if (n <= len) + return res.append(suffix).toString(); + else { + len = n - len; + res.append(getCheseWord(len)); + res.append(suffix); + return res.toString(); + } + } + + public static String getCheseWord(int len) { + StringBuilder res = new StringBuilder(); + for (int i = 0; i < len; i++) { + char str = getCheseChar(); + res.append(str); + } + return res.toString(); + } + + private static char getCheseChar() { + return (char) (UNICODE_START + rd.nextInt(UNICODE_END - UNICODE_START)); + } + + public static boolean getBoolean() { + return getIntegerMoreThanZeroLessThan(3) == 1; + } + + public static String getStringByUUID() { + return UUID.randomUUID().toString(); + } + + public static int[] getRandomArray(int min, int max, int n) { + int len = max - min + 1; + + if (max < min || n > len) { + return null; + } + + int[] source = new int[len]; + for (int i = min; i < min + len; i++) { + source[i - min] = i; + } + + int[] result = new int[n]; + Random rd = new Random(); + int index = 0; + for (int i = 0; i < result.length; i++) { + index = Math.abs(rd.nextInt() % len--); + result[i] = source[index]; + source[index] = source[len]; + } + return result; + } + + public static Collection<Integer> getRandomCollection(int min, int max, int n) { + Set<Integer> res = new HashSet<Integer>(); + int mx = max; + int mn = min; + if (n == (max + 1 - min)) { + for (int i = 1; i <= n; i++) { + res.add(i); + } + return res; + } + for (int i = 0; i < n; i++) { + int v = getIntegerBetween(mn, mx); + if (v == mx) { + mx--; + } + if (v == mn) { + mn++; + } + while (res.contains(v)) { + v = getIntegerBetween(mn, mx); + if (v == mx) { + mx = v; + } + if (v == mn) { + mn = v; + } + } + res.add(v); + } + return res; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/util/RandomUtils.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/rocketmq/test/util/RandomUtils.java b/test/src/main/java/org/apache/rocketmq/test/util/RandomUtils.java new file mode 100644 index 0000000..9eca28b --- /dev/null +++ b/test/src/main/java/org/apache/rocketmq/test/util/RandomUtils.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.util; + +import java.util.Random; +import java.util.UUID; + +public class RandomUtils { + private static final int UNICODE_START = '\u4E00'; + private static final int UNICODE_END = '\u9FA0'; + private static Random rd = new Random(); + + private RandomUtils() { + + } + + public static String getStringByUUID() { + return UUID.randomUUID().toString(); + } + + public static String getCheseWord(int len) { + StringBuilder res = new StringBuilder(); + + for (int i = 0; i < len; ++i) { + char str = getCheseChar(); + res.append(str); + } + + return res.toString(); + } + + public static String getStringWithNumber(int n) { + int arg[] = new int[] {'0', '9' + 1}; + return getString(n, arg); + } + + public static String getStringWithCharacter(int n) { + int arg[] = new int[] {'a', 'z' + 1, 'A', 'Z' + 1}; + return getString(n, arg); + } + + private static String getString(int n, int arg[]) { + StringBuilder res = new StringBuilder(); + for (int i = 0; i < n; i++) { + res.append(getChar(arg)); + } + return res.toString(); + } + + private static char getChar(int arg[]) { + int size = arg.length; + int c = rd.nextInt(size / 2); + c = c * 2; + return (char) (getIntegerBetween(arg[c], arg[c + 1])); + } + + public static int getIntegerBetween(int n, int m) { + if (m == n) { + return n; + } + int res = getIntegerMoreThanZero(); + return n + res % (m - n); + } + + public static int getIntegerMoreThanZero() { + int res = rd.nextInt(); + while (res <= 0) { + res = rd.nextInt(); + } + return res; + } + + private static char getCheseChar() { + return (char) (UNICODE_START + rd.nextInt(UNICODE_END - UNICODE_START)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/util/TestUtil.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/rocketmq/test/util/TestUtil.java b/test/src/main/java/org/apache/rocketmq/test/util/TestUtil.java new file mode 100644 index 0000000..591b3b7 --- /dev/null +++ b/test/src/main/java/org/apache/rocketmq/test/util/TestUtil.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.util; + +import java.io.IOException; +import java.util.Collections; +import java.util.Comparator; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public final class TestUtil { + + private TestUtil() { + } + + public static Long parseStringToLong(String s, Long defval) { + Long val = defval; + try { + val = Long.parseLong(s); + } catch (NumberFormatException e) { + val = defval; + } + return val; + } + + public static Integer parseStringToInteger(String s, Integer defval) { + Integer val = defval; + try { + val = Integer.parseInt(s); + } catch (NumberFormatException e) { + val = defval; + } + return val; + } + + public static String addQuoteToParamater(String param) { + StringBuilder sb = new StringBuilder("'"); + sb.append(param).append("'"); + return sb.toString(); + } + + public static void waitForMonment(long time) { + try { + Thread.sleep(time); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + public static void waitForSeconds(long time) { + try { + TimeUnit.SECONDS.sleep(time); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + public static void waitForMinutes(long time) { + try { + TimeUnit.MINUTES.sleep(time); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + public static void waitForInputQuit() { + waitForInput("quit"); + } + + public static void waitForInput(String keyWord) { + waitForInput(keyWord, + String.format("The thread will wait until you input stop command[%s]:", keyWord)); + } + + public static void waitForInput(String keyWord, String info) { + try { + byte[] b = new byte[1024]; + int n = System.in.read(b); + String s = new String(b, 0, n - 1).replace("\r", "").replace("\n", ""); + while (!s.equals(keyWord)) { + n = System.in.read(b); + s = new String(b, 0, n - 1); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + + public static <K, V extends Comparable<? super V>> Map<K, V> sortByValue(Map<K, V> map) { + List<Map.Entry<K, V>> list = new LinkedList<Map.Entry<K, V>>(map.entrySet()); + Collections.sort(list, new Comparator<Map.Entry<K, V>>() { + public int compare(Map.Entry<K, V> o1, Map.Entry<K, V> o2) { + return (o1.getValue()).compareTo(o2.getValue()); + } + }); + + Map<K, V> result = new LinkedHashMap<K, V>(); + for (Map.Entry<K, V> entry : list) { + result.put(entry.getKey(), entry.getValue()); + } + return result; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/util/TestUtils.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/rocketmq/test/util/TestUtils.java b/test/src/main/java/org/apache/rocketmq/test/util/TestUtils.java new file mode 100644 index 0000000..6326d46 --- /dev/null +++ b/test/src/main/java/org/apache/rocketmq/test/util/TestUtils.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.util; + +import java.util.concurrent.TimeUnit; + +public class TestUtils { + public static void waitForMonment(long time) { + try { + Thread.sleep(time); + } catch (InterruptedException var3) { + var3.printStackTrace(); + } + + } + + public static void waitForSeconds(long time) { + try { + TimeUnit.SECONDS.sleep(time); + } catch (InterruptedException var3) { + var3.printStackTrace(); + } + + } + + public static void waitForMinutes(long time) { + try { + TimeUnit.MINUTES.sleep(time); + } catch (InterruptedException var3) { + var3.printStackTrace(); + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/util/VerifyUtils.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/rocketmq/test/util/VerifyUtils.java b/test/src/main/java/org/apache/rocketmq/test/util/VerifyUtils.java new file mode 100644 index 0000000..965d2ee --- /dev/null +++ b/test/src/main/java/org/apache/rocketmq/test/util/VerifyUtils.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.util; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import org.apache.log4j.Logger; +import org.apache.rocketmq.common.message.MessageExt; + +public class VerifyUtils { + private static Logger logger = Logger.getLogger(VerifyUtils.class); + + public static int verify(Collection<Object> sendMsgs, Collection<Object> recvMsgs) { + int miss = 0; + for (Object msg : sendMsgs) { + if (!recvMsgs.contains(msg)) { + miss++; + } + } + + return miss; + } + + public static Collection<Object> getFilterdMessage(Collection<Object> sendMsgs, + Collection<Object> recvMsgs) { + Collection<Object> recvMsgsSync = Collections.synchronizedCollection(recvMsgs); + Collection<Object> filterdMsgs = new ArrayList<Object>(); + int filterNum = 0; + for (Object msg : recvMsgsSync) { + if (sendMsgs.contains(msg)) { + filterdMsgs.add(msg); + } else { + filterNum++; + } + } + + logger.info(String.format("[%s] messages is filterd!", filterNum)); + return filterdMsgs; + } + + public static int verifyUserProperty(Collection<Object> sendMsgs, Collection<Object> recvMsgs) { + return 0; + } + + public static void verifyMessageQueueId(int expectId, Collection<Object> msgs) { + for (Object msg : msgs) { + MessageExt msgEx = (MessageExt) msg; + assert expectId == msgEx.getQueueId(); + } + } + + public static boolean verifyBalance(int msgSize, float error, int... recvSize) { + boolean balance = true; + int evenSize = msgSize / recvSize.length; + for (int size : recvSize) { + if (Math.abs(size - evenSize) > error * evenSize) { + balance = false; + break; + } + } + return balance; + } + + public static boolean verifyBalance(int msgSize, int... recvSize) { + return verifyBalance(msgSize, 0.1f, recvSize); + } + + public static boolean verifyDelay(long delayTimeMills, Collection<Object> recvMsgTimes, + int errorMills) { + boolean delay = true; + for (Object timeObj : recvMsgTimes) { + long time = (Long) timeObj; + if (Math.abs(time - delayTimeMills) > errorMills) { + delay = false; + logger.info(String.format("delay error:%s", Math.abs(time - delayTimeMills))); + } + } + return delay; + } + + public static boolean verifyDelay(long delayTimeMills, Collection<Object> recvMsgTimes) { + int errorMills = 500; + return verifyDelay(delayTimeMills, recvMsgTimes, errorMills); + } + + public static boolean verifyOrder(Collection<Collection<Object>> queueMsgs) { + for (Collection<Object> msgs : queueMsgs) { + if (!verifyOrderMsg(msgs)) { + return false; + } + } + return true; + + } + + public static boolean verifyOrderMsg(Collection<Object> msgs) { + int min = Integer.MIN_VALUE; + int curr; + if (msgs.size() == 0 || msgs.size() == 1) { + return true; + } else { + for (Object msg : msgs) { + curr = Integer.parseInt((String) msg); + if (curr < min) { + return false; + } else { + min = curr; + } + } + } + return true; + } + + public static boolean verifyRT(Collection<Object> rts, long maxRTMills) { + boolean rtExpect = true; + for (Object obj : rts) { + long rt = (Long) obj; + if (rt > maxRTMills) { + rtExpect = false; + logger.info(String.format("%s greater thran maxtRT:%s!", rt, maxRTMills)); + + } + } + return rtExpect; + } + + public static void main(String args[]) { + verifyBalance(400, 0.1f, 230, 190); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/util/data/collect/DataCollector.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/rocketmq/test/util/data/collect/DataCollector.java b/test/src/main/java/org/apache/rocketmq/test/util/data/collect/DataCollector.java new file mode 100644 index 0000000..cbbc8a5 --- /dev/null +++ b/test/src/main/java/org/apache/rocketmq/test/util/data/collect/DataCollector.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.util.data.collect; + +import java.util.Collection; + +public interface DataCollector { + + void resetData(); + + Collection<Object> getAllData(); + + Collection<Object> getAllDataWithoutDuplicate(); + + void addData(Object data); + + long getDataSizeWithoutDuplicate(); + + long getDataSize(); + + boolean isRepeatedData(Object data); + + int getRepeatedTimeForData(Object data); + + void removeData(Object data); + + void lockIncrement(); + + void unlockIncrement(); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/util/data/collect/DataCollectorManager.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/rocketmq/test/util/data/collect/DataCollectorManager.java b/test/src/main/java/org/apache/rocketmq/test/util/data/collect/DataCollectorManager.java new file mode 100644 index 0000000..47f4d81 --- /dev/null +++ b/test/src/main/java/org/apache/rocketmq/test/util/data/collect/DataCollectorManager.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.util.data.collect; + +import java.util.HashMap; +import java.util.Map; +import org.apache.rocketmq.test.util.data.collect.impl.ListDataCollectorImpl; +import org.apache.rocketmq.test.util.data.collect.impl.MapDataCollectorImpl; + +public final class DataCollectorManager { + private static DataCollectorManager instance = new DataCollectorManager(); + private Map<String, DataCollector> collectMap = new HashMap<String, DataCollector>(); + private Object lock = new Object(); + + private DataCollectorManager() { + } + + public static DataCollectorManager getInstance() { + return instance; + } + + public DataCollector fetchDataCollector(String key) { + String realKey = key; + if (!collectMap.containsKey(realKey)) { + synchronized (lock) { + if (!collectMap.containsKey(realKey)) { + DataCollector collect = (DataCollector) new MapDataCollectorImpl(); + collectMap.put(realKey, collect); + } + } + } + return collectMap.get(realKey); + } + + public DataCollector fetchMapDataCollector(String key) { + String realKey = key; + if (!collectMap.containsKey(realKey) + || collectMap.get(realKey) instanceof ListDataCollectorImpl) { + synchronized (lock) { + if (!collectMap.containsKey(realKey) + || collectMap.get(realKey) instanceof ListDataCollectorImpl) { + DataCollector collect = null; + if (collectMap.containsKey(realKey)) { + DataCollector src = collectMap.get(realKey); + collect = new MapDataCollectorImpl(src.getAllData()); + } else { + collect = new MapDataCollectorImpl(); + } + collectMap.put(realKey, collect); + + } + } + } + return collectMap.get(realKey); + } + + public DataCollector fetchListDataCollector(String key) { + String realKey = key; + if (!collectMap.containsKey(realKey) + || collectMap.get(realKey) instanceof MapDataCollectorImpl) { + synchronized (lock) { + if (!collectMap.containsKey(realKey) + || collectMap.get(realKey) instanceof MapDataCollectorImpl) { + DataCollector collect = null; + if (collectMap.containsKey(realKey)) { + DataCollector src = collectMap.get(realKey); + collect = new ListDataCollectorImpl(src.getAllData()); + } else { + collect = new ListDataCollectorImpl(); + } + collectMap.put(realKey, collect); + } + } + } + return collectMap.get(realKey); + } + + public void resetDataCollect(String key) { + if (collectMap.containsKey(key)) { + collectMap.get(key).resetData(); + } + } + + public void resetAll() { + for (Map.Entry<String, DataCollector> entry : collectMap.entrySet()) { + entry.getValue().resetData(); + } + } + + public void removeDataCollect(String key) { + collectMap.remove(key); + } + + public void removeAll() { + collectMap.clear(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/util/data/collect/DataFilter.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/rocketmq/test/util/data/collect/DataFilter.java b/test/src/main/java/org/apache/rocketmq/test/util/data/collect/DataFilter.java new file mode 100644 index 0000000..b01adc5 --- /dev/null +++ b/test/src/main/java/org/apache/rocketmq/test/util/data/collect/DataFilter.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.util.data.collect; + +public interface DataFilter { + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/util/data/collect/impl/ListDataCollectorImpl.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/rocketmq/test/util/data/collect/impl/ListDataCollectorImpl.java b/test/src/main/java/org/apache/rocketmq/test/util/data/collect/impl/ListDataCollectorImpl.java new file mode 100644 index 0000000..82ab461 --- /dev/null +++ b/test/src/main/java/org/apache/rocketmq/test/util/data/collect/impl/ListDataCollectorImpl.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.util.data.collect.impl; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import org.apache.rocketmq.test.util.data.collect.DataCollector; + +public class ListDataCollectorImpl implements DataCollector { + + private List<Object> datas = new ArrayList<Object>(); + private boolean lock = false; + + public ListDataCollectorImpl() { + + } + + public ListDataCollectorImpl(Collection<Object> datas) { + for (Object data : datas) { + addData(data); + } + } + + public Collection<Object> getAllData() { + return datas; + } + + public void resetData() { + datas.clear(); + unlockIncrement(); + } + + public long getDataSizeWithoutDuplicate() { + return getAllDataWithoutDuplicate().size(); + } + + public synchronized void addData(Object data) { + if (lock) { + return; + } + datas.add(data); + } + + public long getDataSize() { + return datas.size(); + } + + public boolean isRepeatedData(Object data) { + return Collections.frequency(datas, data) == 1; + } + + public Collection<Object> getAllDataWithoutDuplicate() { + return new HashSet<Object>(datas); + } + + public int getRepeatedTimeForData(Object data) { + int res = 0; + for (Object obj : datas) { + if (obj.equals(data)) { + res++; + } + } + return res; + } + + public void removeData(Object data) { + datas.remove(data); + } + + public void lockIncrement() { + lock = true; + } + + public void unlockIncrement() { + lock = false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/util/data/collect/impl/MapDataCollectorImpl.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/rocketmq/test/util/data/collect/impl/MapDataCollectorImpl.java b/test/src/main/java/org/apache/rocketmq/test/util/data/collect/impl/MapDataCollectorImpl.java new file mode 100644 index 0000000..899bb85 --- /dev/null +++ b/test/src/main/java/org/apache/rocketmq/test/util/data/collect/impl/MapDataCollectorImpl.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.util.data.collect.impl; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.rocketmq.test.util.data.collect.DataCollector; + +public class MapDataCollectorImpl implements DataCollector { + + private Map<Object, AtomicInteger> datas = new ConcurrentHashMap<Object, AtomicInteger>(); + private boolean lock = false; + + public MapDataCollectorImpl() { + + } + + public MapDataCollectorImpl(Collection<Object> datas) { + for (Object data : datas) { + addData(data); + } + } + + public synchronized void addData(Object data) { + if (lock) { + return; + } + if (datas.containsKey(data)) { + datas.get(data).addAndGet(1); + } else { + datas.put(data, new AtomicInteger(1)); + } + } + + public Collection<Object> getAllData() { + List<Object> lst = new ArrayList<Object>(); + for (Entry<Object, AtomicInteger> entry : datas.entrySet()) { + for (int i = 0; i < entry.getValue().get(); i++) { + lst.add(entry.getKey()); + } + } + return lst; + } + + public long getDataSizeWithoutDuplicate() { + return datas.keySet().size(); + } + + public void resetData() { + datas.clear(); + unlockIncrement(); + } + + public long getDataSize() { + long sum = 0; + for (AtomicInteger count : datas.values()) { + sum = sum + count.get(); + } + return sum; + } + + public boolean isRepeatedData(Object data) { + if (datas.containsKey(data)) { + return datas.get(data).get() == 1; + } + return false; + } + + public Collection<Object> getAllDataWithoutDuplicate() { + return datas.keySet(); + } + + public int getRepeatedTimeForData(Object data) { + if (datas.containsKey(data)) { + return datas.get(data).intValue(); + } + return 0; + } + + public void removeData(Object data) { + datas.remove(data); + } + + public void lockIncrement() { + lock = true; + } + + public void unlockIncrement() { + lock = false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/util/parallel/ParallelTask.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/rocketmq/test/util/parallel/ParallelTask.java b/test/src/main/java/org/apache/rocketmq/test/util/parallel/ParallelTask.java new file mode 100644 index 0000000..a4ad9a8 --- /dev/null +++ b/test/src/main/java/org/apache/rocketmq/test/util/parallel/ParallelTask.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.util.parallel; + +import java.util.concurrent.CountDownLatch; + +public abstract class ParallelTask extends Thread { + private CountDownLatch latch = null; + + public CountDownLatch getLatch() { + return latch; + } + + public void setLatch(CountDownLatch latch) { + this.latch = latch; + } + + public abstract void execute(); + + @Override + public void run() { + this.execute(); + + if (latch != null) { + latch.countDown(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/util/parallel/ParallelTaskExecutor.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/rocketmq/test/util/parallel/ParallelTaskExecutor.java b/test/src/main/java/org/apache/rocketmq/test/util/parallel/ParallelTaskExecutor.java new file mode 100644 index 0000000..e7e9209 --- /dev/null +++ b/test/src/main/java/org/apache/rocketmq/test/util/parallel/ParallelTaskExecutor.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.util.parallel; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public class ParallelTaskExecutor { + public List<ParallelTask> tasks = new ArrayList<ParallelTask>(); + public ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); + public CountDownLatch latch = null; + + public ParallelTaskExecutor() { + + } + + public void pushTask(ParallelTask task) { + tasks.add(task); + } + + public void startBlock() { + init(); + startTask(); + try { + latch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + public void startNoBlock() { + for (ParallelTask task : tasks) { + cachedThreadPool.execute(task); + } + } + + private void init() { + latch = new CountDownLatch(tasks.size()); + for (ParallelTask task : tasks) { + task.setLatch(latch); + } + } + + private void startTask() { + for (ParallelTask task : tasks) { + task.start(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/util/parallel/Task4Test.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/rocketmq/test/util/parallel/Task4Test.java b/test/src/main/java/org/apache/rocketmq/test/util/parallel/Task4Test.java new file mode 100644 index 0000000..c168d66 --- /dev/null +++ b/test/src/main/java/org/apache/rocketmq/test/util/parallel/Task4Test.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.util.parallel; + +public class Task4Test extends ParallelTask { + private String name = ""; + + public Task4Test(String name) { + this.name = name; + } + + @Override + public void execute() { + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java new file mode 100644 index 0000000..57462a2 --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.base; + +import java.util.ArrayList; +import java.util.List; +import org.apache.log4j.Logger; +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.namesrv.NamesrvController; +import org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer; +import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; +import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; +import org.apache.rocketmq.test.clientinterface.AbstractMQConsumer; +import org.apache.rocketmq.test.clientinterface.AbstractMQProducer; +import org.apache.rocketmq.test.factory.ConsumerFactory; +import org.apache.rocketmq.test.listener.AbstractListener; +import org.apache.rocketmq.test.util.MQAdmin; +import org.apache.rocketmq.test.util.MQRandomUtils; +import org.apache.rocketmq.test.util.TestUtils; +import org.junit.Assert; + +public class BaseConf { + protected static String nsAddr; + protected static String broker1Name; + protected static String broker2Name; + protected static String clusterName; + protected static int brokerNum; + protected static int waitTime = 5; + protected static int consumeTime = 1 * 60 * 1000; + protected static int topicCreateTime = 30 * 1000; + protected static NamesrvController namesrvController; + protected static BrokerController brokerController1; + protected static BrokerController brokerController2; + protected static List<Object> mqClients = new ArrayList<Object>(); + protected static boolean debug = false; + private static Logger log = Logger.getLogger(BaseConf.class); + + static { + namesrvController = IntegrationTestBase.createAndStartNamesrv(); + nsAddr = "127.0.0.1:" + namesrvController.getNettyServerConfig().getListenPort(); + brokerController1 = IntegrationTestBase.createAndStartBroker(nsAddr); + brokerController2 = IntegrationTestBase.createAndStartBroker(nsAddr); + clusterName = brokerController1.getBrokerConfig().getBrokerClusterName(); + broker1Name = brokerController1.getBrokerConfig().getBrokerName(); + broker2Name = brokerController2.getBrokerConfig().getBrokerName(); + brokerNum = 2; + } + + public BaseConf() { + + } + + public static String initTopic() { + long startTime = System.currentTimeMillis(); + String topic = MQRandomUtils.getRandomTopic(); + boolean createResult = false; + while (true) { + createResult = MQAdmin.createTopic(nsAddr, clusterName, topic, 8); + if (createResult) { + break; + } else if (System.currentTimeMillis() - startTime > topicCreateTime) { + Assert.fail(String.format("topic[%s] is created failed after:%d ms", topic, + System.currentTimeMillis() - startTime)); + break; + } else { + TestUtils.waitForMonment(500); + continue; + } + } + + return topic; + } + + public static String initConsumerGroup() { + String group = MQRandomUtils.getRandomConsumerGroup(); + return initConsumerGroup(group); + } + + public static String initConsumerGroup(String group) { + MQAdmin.createSub(nsAddr, clusterName, group); + return group; + } + + public static RMQNormalProducer getProducer(String nsAddr, String topic) { + RMQNormalProducer producer = new RMQNormalProducer(nsAddr, topic); + if (debug) { + producer.setDebug(); + } + mqClients.add(producer); + return producer; + } + + public static RMQNormalProducer getProducer(String nsAddr, String topic, String producerGoup, + String instanceName) { + RMQNormalProducer producer = new RMQNormalProducer(nsAddr, topic, producerGoup, + instanceName); + if (debug) { + producer.setDebug(); + } + mqClients.add(producer); + return producer; + } + + public static RMQAsyncSendProducer getAsyncProducer(String nsAddr, String topic) { + RMQAsyncSendProducer producer = new RMQAsyncSendProducer(nsAddr, topic); + if (debug) { + producer.setDebug(); + } + mqClients.add(producer); + return producer; + } + + public static RMQNormalConsumer getConsumer(String nsAddr, String topic, String subExpression, + AbstractListener listner) { + String consumerGroup = initConsumerGroup(); + return getConsumer(nsAddr, consumerGroup, topic, subExpression, listner); + } + + public static RMQNormalConsumer getConsumer(String nsAddr, String consumerGroup, String topic, + String subExpression, AbstractListener listner) { + RMQNormalConsumer consumer = ConsumerFactory.getRMQNormalConsumer(nsAddr, consumerGroup, + topic, subExpression, listner); + if (debug) { + consumer.setDebug(); + } + mqClients.add(consumer); + log.info(String.format("consumer[%s] start,topic[%s],subExpression[%s]", consumerGroup, + topic, subExpression)); + return consumer; + } + + public static void shutDown() { + try { + for (Object mqClient : mqClients) { + if (mqClient instanceof AbstractMQProducer) { + ((AbstractMQProducer) mqClient).shutdown(); + + } else { + ((AbstractMQConsumer) mqClient).shutdown(); + } + } + } catch (Exception e) { + e.printStackTrace(); + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java new file mode 100644 index 0000000..ff9996d --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.base; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.namesrv.NamesrvConfig; +import org.apache.rocketmq.namesrv.NamesrvController; +import org.apache.rocketmq.remoting.netty.NettyClientConfig; +import org.apache.rocketmq.remoting.netty.NettyServerConfig; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class IntegrationTestBase { + protected static final String SEP = File.separator; + protected static final String BROKER_NAME_PREFIX = "TestBrokerName_"; + protected static final AtomicInteger BROKER_INDEX = new AtomicInteger(0); + protected static final List<File> TMPE_FILES = new ArrayList<>(); + protected static final List<BrokerController> BROKER_CONTROLLERS = new ArrayList<>(); + protected static final List<NamesrvController> NAMESRV_CONTROLLERS = new ArrayList<>(); + public static Logger logger = LoggerFactory.getLogger(IntegrationTestBase.class); + protected static Random random = new Random(); + + static { + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override public void run() { + for (NamesrvController namesrvController : NAMESRV_CONTROLLERS) { + if (namesrvController != null) { + namesrvController.shutdown(); + } + } + for (BrokerController brokerController : BROKER_CONTROLLERS) { + if (brokerController != null) { + brokerController.shutdown(); + } + } + for (File file : TMPE_FILES) { + deleteFile(file); + } + } + }); + + } + + private static String createBaseDir() { + String baseDir = System.getProperty("user.home") + SEP + "unitteststore-" + UUID.randomUUID(); + final File file = new File(baseDir); + if (file.exists()) { + logger.info(String.format("[%s] has already existed, please bake up and remove it for integration tests", baseDir)); + System.exit(1); + } + TMPE_FILES.add(file); + return baseDir; + } + + public static NamesrvController createAndStartNamesrv() { + String baseDir = createBaseDir(); + NamesrvConfig namesrvConfig = new NamesrvConfig(); + NettyServerConfig nameServerNettyServerConfig = new NettyServerConfig(); + namesrvConfig.setKvConfigPath(baseDir + SEP + "namesrv" + SEP + "kvConfig.json"); + namesrvConfig.setConfigStorePath(baseDir + SEP + "namesrv" + SEP + "namesrv.properties"); + + nameServerNettyServerConfig.setListenPort(9000 + random.nextInt(1000)); + NamesrvController namesrvController = new NamesrvController(namesrvConfig, nameServerNettyServerConfig); + try { + Assert.assertTrue(namesrvController.initialize()); + logger.info("Name Server Start:{}", nameServerNettyServerConfig.getListenPort()); + namesrvController.start(); + } catch (Exception e) { + logger.info("Name Server start failed"); + System.exit(1); + } + NAMESRV_CONTROLLERS.add(namesrvController); + return namesrvController; + + } + + public static BrokerController createAndStartBroker(String nsAddr) { + String baseDir = createBaseDir(); + BrokerConfig brokerConfig = new BrokerConfig(); + NettyServerConfig nettyServerConfig = new NettyServerConfig(); + NettyClientConfig nettyClientConfig = new NettyClientConfig(); + MessageStoreConfig storeConfig = new MessageStoreConfig(); + brokerConfig.setBrokerName(BROKER_NAME_PREFIX + BROKER_INDEX.getAndIncrement()); + brokerConfig.setBrokerIP1("127.0.0.1"); + brokerConfig.setNamesrvAddr(nsAddr); + storeConfig.setStorePathRootDir(baseDir); + storeConfig.setStorePathCommitLog(baseDir + SEP + "commitlog"); + storeConfig.setHaListenPort(8000 + random.nextInt(1000)); + nettyServerConfig.setListenPort(10000 + random.nextInt(1000)); + BrokerController brokerController = new BrokerController(brokerConfig, nettyServerConfig, nettyClientConfig, storeConfig); + try { + Assert.assertTrue(brokerController.initialize()); + logger.info("Broker Start name:{} addr:{}", brokerConfig.getBrokerName(), brokerController.getBrokerAddr()); + brokerController.start(); + } catch (Exception e) { + logger.info("Broker start failed"); + System.exit(1); + } + BROKER_CONTROLLERS.add(brokerController); + return brokerController; + } + + public static void deleteFile(File file) { + if (!file.exists()) { + return; + } + if (file.isFile()) { + file.delete(); + } else if (file.isDirectory()) { + File[] files = file.listFiles(); + for (File file1 : files) { + deleteFile(file1); + } + file.delete(); + } + } + +}
