http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/heartbeat/SubscriptionData.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/heartbeat/SubscriptionData.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/heartbeat/SubscriptionData.java new file mode 100644 index 0000000..b4b72fc --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/heartbeat/SubscriptionData.java @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * $Id: SubscriptionData.java 1835 2013-05-16 02:00:50Z shijia.wxr $ + */ +package com.alibaba.rocketmq.common.protocol.heartbeat; + +import com.alibaba.fastjson.annotation.JSONField; + +import java.util.HashSet; +import java.util.Set; + + +/** + * @author shijia.wxr + */ +public class SubscriptionData implements Comparable<SubscriptionData> { + public final static String SUB_ALL = "*"; + private boolean classFilterMode = false; + private String topic; + private String subString; + private Set<String> tagsSet = new HashSet<String>(); + private Set<Integer> codeSet = new HashSet<Integer>(); + private long subVersion = System.currentTimeMillis(); + + @JSONField(serialize = false) + private String filterClassSource; + + + public SubscriptionData() { + + } + + + public SubscriptionData(String topic, String subString) { + super(); + this.topic = topic; + this.subString = subString; + } + + public String getFilterClassSource() { + return filterClassSource; + } + + public void setFilterClassSource(String filterClassSource) { + this.filterClassSource = filterClassSource; + } + + public String getTopic() { + return topic; + } + + + public void setTopic(String topic) { + this.topic = topic; + } + + + public String getSubString() { + return subString; + } + + + public void setSubString(String subString) { + this.subString = subString; + } + + + public Set<String> getTagsSet() { + return tagsSet; + } + + + public void setTagsSet(Set<String> tagsSet) { + this.tagsSet = tagsSet; + } + + + public long getSubVersion() { + return subVersion; + } + + + public void setSubVersion(long subVersion) { + this.subVersion = subVersion; + } + + + public Set<Integer> getCodeSet() { + return codeSet; + } + + + public void setCodeSet(Set<Integer> codeSet) { + this.codeSet = codeSet; + } + + + public boolean isClassFilterMode() { + return classFilterMode; + } + + + public void setClassFilterMode(boolean classFilterMode) { + this.classFilterMode = classFilterMode; + } + + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + (classFilterMode ? 1231 : 1237); + result = prime * result + ((codeSet == null) ? 0 : codeSet.hashCode()); + result = prime * result + ((subString == null) ? 0 : subString.hashCode()); + result = prime * result + ((tagsSet == null) ? 0 : tagsSet.hashCode()); + result = prime * result + ((topic == null) ? 0 : topic.hashCode()); + return result; + } + + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + SubscriptionData other = (SubscriptionData) obj; + if (classFilterMode != other.classFilterMode) + return false; + if (codeSet == null) { + if (other.codeSet != null) + return false; + } else if (!codeSet.equals(other.codeSet)) + return false; + if (subString == null) { + if (other.subString != null) + return false; + } else if (!subString.equals(other.subString)) + return false; + if (subVersion != other.subVersion) + return false; + if (tagsSet == null) { + if (other.tagsSet != null) + return false; + } else if (!tagsSet.equals(other.tagsSet)) + return false; + if (topic == null) { + if (other.topic != null) + return false; + } else if (!topic.equals(other.topic)) + return false; + return true; + } + + + @Override + public String toString() { + return "SubscriptionData [classFilterMode=" + classFilterMode + ", topic=" + topic + ", subString=" + + subString + ", tagsSet=" + tagsSet + ", codeSet=" + codeSet + ", subVersion=" + subVersion + + "]"; + } + + + @Override + public int compareTo(SubscriptionData other) { + String thisValue = this.topic + "@" + this.subString; + String otherValue = other.topic + "@" + other.subString; + return thisValue.compareTo(otherValue); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/route/BrokerData.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/route/BrokerData.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/route/BrokerData.java new file mode 100644 index 0000000..322953a --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/route/BrokerData.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * $Id: BrokerData.java 1835 2013-05-16 02:00:50Z shijia.wxr $ + */ +package com.alibaba.rocketmq.common.protocol.route; + +import com.alibaba.rocketmq.common.MixAll; + +import java.util.HashMap; +import java.util.Map; + + +/** + * @author shijia.wxr + * + */ +public class BrokerData implements Comparable<BrokerData> { + private String cluster; + private String brokerName; + private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs; + + public String selectBrokerAddr() { + String value = this.brokerAddrs.get(MixAll.MASTER_ID); + if (null == value) { + for (Map.Entry<Long, String> entry : this.brokerAddrs.entrySet()) { + return entry.getValue(); + } + } + + return value; + } + + public HashMap<Long, String> getBrokerAddrs() { + return brokerAddrs; + } + + public void setBrokerAddrs(HashMap<Long, String> brokerAddrs) { + this.brokerAddrs = brokerAddrs; + } + + public String getCluster() { + return cluster; + } + + public void setCluster(String cluster) { + this.cluster = cluster; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((brokerAddrs == null) ? 0 : brokerAddrs.hashCode()); + result = prime * result + ((brokerName == null) ? 0 : brokerName.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + BrokerData other = (BrokerData) obj; + if (brokerAddrs == null) { + if (other.brokerAddrs != null) + return false; + } else if (!brokerAddrs.equals(other.brokerAddrs)) + return false; + if (brokerName == null) { + if (other.brokerName != null) + return false; + } else if (!brokerName.equals(other.brokerName)) + return false; + return true; + } + + @Override + public String toString() { + return "BrokerData [brokerName=" + brokerName + ", brokerAddrs=" + brokerAddrs + "]"; + } + + @Override + public int compareTo(BrokerData o) { + return this.brokerName.compareTo(o.getBrokerName()); + } + + public String getBrokerName() { + return brokerName; + } + + public void setBrokerName(String brokerName) { + this.brokerName = brokerName; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/route/QueueData.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/route/QueueData.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/route/QueueData.java new file mode 100644 index 0000000..6f62340 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/route/QueueData.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * $Id: QueueData.java 1835 2013-05-16 02:00:50Z shijia.wxr $ + */ +package com.alibaba.rocketmq.common.protocol.route; + +public class QueueData implements Comparable<QueueData> { + private String brokerName; + private int readQueueNums; + private int writeQueueNums; + private int perm; + private int topicSynFlag; + + public int getReadQueueNums() { + return readQueueNums; + } + + public void setReadQueueNums(int readQueueNums) { + this.readQueueNums = readQueueNums; + } + + public int getWriteQueueNums() { + return writeQueueNums; + } + + public void setWriteQueueNums(int writeQueueNums) { + this.writeQueueNums = writeQueueNums; + } + + public int getPerm() { + return perm; + } + + public void setPerm(int perm) { + this.perm = perm; + } + + public int getTopicSynFlag() { + return topicSynFlag; + } + + public void setTopicSynFlag(int topicSynFlag) { + this.topicSynFlag = topicSynFlag; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((brokerName == null) ? 0 : brokerName.hashCode()); + result = prime * result + perm; + result = prime * result + readQueueNums; + result = prime * result + writeQueueNums; + result = prime * result + topicSynFlag; + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + QueueData other = (QueueData) obj; + if (brokerName == null) { + if (other.brokerName != null) + return false; + } else if (!brokerName.equals(other.brokerName)) + return false; + if (perm != other.perm) + return false; + if (readQueueNums != other.readQueueNums) + return false; + if (writeQueueNums != other.writeQueueNums) + return false; + if (topicSynFlag != other.topicSynFlag) + return false; + return true; + } + + @Override + public String toString() { + return "QueueData [brokerName=" + brokerName + ", readQueueNums=" + readQueueNums + + ", writeQueueNums=" + writeQueueNums + ", perm=" + perm + ", topicSynFlag=" + topicSynFlag + + "]"; + } + + @Override + public int compareTo(QueueData o) { + return this.brokerName.compareTo(o.getBrokerName()); + } + + public String getBrokerName() { + return brokerName; + } + + public void setBrokerName(String brokerName) { + this.brokerName = brokerName; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/route/TopicRouteData.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/route/TopicRouteData.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/route/TopicRouteData.java new file mode 100644 index 0000000..72e1b96 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/route/TopicRouteData.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * $Id: TopicRouteData.java 1835 2013-05-16 02:00:50Z shijia.wxr $ + */ +package com.alibaba.rocketmq.common.protocol.route; + +import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + + +/** + * @author shijia.wxr + */ +public class TopicRouteData extends RemotingSerializable { + private String orderTopicConf; + private List<QueueData> queueDatas; + private List<BrokerData> brokerDatas; + private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable; + + + public TopicRouteData cloneTopicRouteData() { + TopicRouteData topicRouteData = new TopicRouteData(); + topicRouteData.setQueueDatas(new ArrayList<QueueData>()); + topicRouteData.setBrokerDatas(new ArrayList<BrokerData>()); + topicRouteData.setFilterServerTable(new HashMap<String, List<String>>()); + topicRouteData.setOrderTopicConf(this.orderTopicConf); + + if (this.queueDatas != null) { + topicRouteData.getQueueDatas().addAll(this.queueDatas); + } + + if (this.brokerDatas != null) { + topicRouteData.getBrokerDatas().addAll(this.brokerDatas); + } + + if (this.filterServerTable != null) { + topicRouteData.getFilterServerTable().putAll(this.filterServerTable); + } + + return topicRouteData; + } + + + public List<QueueData> getQueueDatas() { + return queueDatas; + } + + + public void setQueueDatas(List<QueueData> queueDatas) { + this.queueDatas = queueDatas; + } + + + public List<BrokerData> getBrokerDatas() { + return brokerDatas; + } + + + public void setBrokerDatas(List<BrokerData> brokerDatas) { + this.brokerDatas = brokerDatas; + } + + public HashMap<String, List<String>> getFilterServerTable() { + return filterServerTable; + } + + public void setFilterServerTable(HashMap<String, List<String>> filterServerTable) { + this.filterServerTable = filterServerTable; + } + + public String getOrderTopicConf() { + return orderTopicConf; + } + + public void setOrderTopicConf(String orderTopicConf) { + this.orderTopicConf = orderTopicConf; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((brokerDatas == null) ? 0 : brokerDatas.hashCode()); + result = prime * result + ((orderTopicConf == null) ? 0 : orderTopicConf.hashCode()); + result = prime * result + ((queueDatas == null) ? 0 : queueDatas.hashCode()); + result = prime * result + ((filterServerTable == null) ? 0 : filterServerTable.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + TopicRouteData other = (TopicRouteData) obj; + if (brokerDatas == null) { + if (other.brokerDatas != null) + return false; + } else if (!brokerDatas.equals(other.brokerDatas)) + return false; + if (orderTopicConf == null) { + if (other.orderTopicConf != null) + return false; + } else if (!orderTopicConf.equals(other.orderTopicConf)) + return false; + if (queueDatas == null) { + if (other.queueDatas != null) + return false; + } else if (!queueDatas.equals(other.queueDatas)) + return false; + if (filterServerTable == null) { + if (other.filterServerTable != null) + return false; + } else if (!filterServerTable.equals(other.filterServerTable)) + return false; + return true; + } + + @Override + public String toString() { + return "TopicRouteData [orderTopicConf=" + orderTopicConf + ", queueDatas=" + queueDatas + + ", brokerDatas=" + brokerDatas + ", filterServerTable=" + filterServerTable + "]"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/topic/OffsetMovedEvent.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/topic/OffsetMovedEvent.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/topic/OffsetMovedEvent.java new file mode 100644 index 0000000..86bdd3d --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/topic/OffsetMovedEvent.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.protocol.topic; + +import com.alibaba.rocketmq.common.message.MessageQueue; +import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable; + + +public class OffsetMovedEvent extends RemotingSerializable { + private String consumerGroup; + private MessageQueue messageQueue; + private long offsetRequest; + private long offsetNew; + + + public String getConsumerGroup() { + return consumerGroup; + } + + + public void setConsumerGroup(String consumerGroup) { + this.consumerGroup = consumerGroup; + } + + + public MessageQueue getMessageQueue() { + return messageQueue; + } + + + public void setMessageQueue(MessageQueue messageQueue) { + this.messageQueue = messageQueue; + } + + + public long getOffsetRequest() { + return offsetRequest; + } + + + public void setOffsetRequest(long offsetRequest) { + this.offsetRequest = offsetRequest; + } + + + public long getOffsetNew() { + return offsetNew; + } + + + public void setOffsetNew(long offsetNew) { + this.offsetNew = offsetNew; + } + + + @Override + public String toString() { + return "OffsetMovedEvent [consumerGroup=" + consumerGroup + ", messageQueue=" + messageQueue + + ", offsetRequest=" + offsetRequest + ", offsetNew=" + offsetNew + "]"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/queue/ConcurrentTreeMap.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/queue/ConcurrentTreeMap.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/queue/ConcurrentTreeMap.java new file mode 100644 index 0000000..8fc4e76 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/queue/ConcurrentTreeMap.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.queue; + +import com.alibaba.rocketmq.common.constant.LoggerName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Comparator; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.locks.ReentrantLock; + + +/** + * thread safe + * + * @author lansheng.zj + */ +public class ConcurrentTreeMap<K, V> { + private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + private final ReentrantLock lock; + private TreeMap<K, V> tree; + private RoundQueue<K> roundQueue; + + + public ConcurrentTreeMap(int capacity, Comparator<? super K> comparator) { + tree = new TreeMap<K, V>(comparator); + roundQueue = new RoundQueue<K>(capacity); + lock = new ReentrantLock(true); + } + + + public Map.Entry<K, V> pollFirstEntry() { + lock.lock(); + try { + return tree.pollFirstEntry(); + } finally { + lock.unlock(); + } + } + + + public V putIfAbsentAndRetExsit(K key, V value) { + lock.lock(); + try { + if (roundQueue.put(key)) { + V exsit = tree.get(key); + if (null == exsit) { + tree.put(key, value); + exsit = value; + } + log.warn("putIfAbsentAndRetExsit success. {}", key); + return exsit; + } + + else { + V exsit = tree.get(key); + return exsit; + } + } finally { + lock.unlock(); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/queue/RoundQueue.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/queue/RoundQueue.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/queue/RoundQueue.java new file mode 100644 index 0000000..a3783ba --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/queue/RoundQueue.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.queue; + +import java.util.LinkedList; +import java.util.Queue; + + +/** + * not thread safe + * + * @author lansheng.zj + */ +public class RoundQueue<E> { + + private Queue<E> queue; + private int capacity; + + + public RoundQueue(int capacity) { + this.capacity = capacity; + queue = new LinkedList<E>(); + } + + + public boolean put(E e) { + boolean ok = false; + if (!queue.contains(e)) { + if (queue.size() >= capacity) { + queue.poll(); + } + queue.add(e); + ok = true; + } + + return ok; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/running/RunningStats.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/running/RunningStats.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/running/RunningStats.java new file mode 100644 index 0000000..aa0bc54 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/running/RunningStats.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.common.running; + +public enum RunningStats { + commitLogMaxOffset, + commitLogMinOffset, + commitLogDiskRatio, + consumeQueueDiskRatio, + scheduleMessageOffset, +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/MomentStatsItem.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/MomentStatsItem.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/MomentStatsItem.java new file mode 100644 index 0000000..89eefa5 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/MomentStatsItem.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.stats; + +import com.alibaba.rocketmq.common.UtilAll; +import org.slf4j.Logger; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + + +public class MomentStatsItem { + + private final AtomicLong value = new AtomicLong(0); + + private final String statsName; + private final String statsKey; + private final ScheduledExecutorService scheduledExecutorService; + private final Logger log; + + + public MomentStatsItem(String statsName, String statsKey, + ScheduledExecutorService scheduledExecutorService, Logger log) { + this.statsName = statsName; + this.statsKey = statsKey; + this.scheduledExecutorService = scheduledExecutorService; + this.log = log; + } + + + public void init() { + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + printAtMinutes(); + + MomentStatsItem.this.value.set(0); + } catch (Throwable e) { + } + } + }, Math.abs(UtilAll.computNextMinutesTimeMillis() - System.currentTimeMillis()), 1000 * 60 * 5, TimeUnit.MILLISECONDS); + } + + + public void printAtMinutes() { + log.info(String.format("[%s] [%s] Stats Every 5 Minutes, Value: %d", + this.statsName, + this.statsKey, + this.value.get())); + } + + public AtomicLong getValue() { + return value; + } + + + public String getStatsKey() { + return statsKey; + } + + + public String getStatsName() { + return statsName; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/MomentStatsItemSet.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/MomentStatsItemSet.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/MomentStatsItemSet.java new file mode 100644 index 0000000..fde88cd --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/MomentStatsItemSet.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.stats; + +import com.alibaba.rocketmq.common.UtilAll; +import org.slf4j.Logger; + +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + + +public class MomentStatsItemSet { + private final ConcurrentHashMap<String/* key */, MomentStatsItem> statsItemTable = + new ConcurrentHashMap<String, MomentStatsItem>(128); + private final String statsName; + private final ScheduledExecutorService scheduledExecutorService; + private final Logger log; + + + public MomentStatsItemSet(String statsName, ScheduledExecutorService scheduledExecutorService, Logger log) { + this.statsName = statsName; + this.scheduledExecutorService = scheduledExecutorService; + this.log = log; + this.init(); + } + + public ConcurrentHashMap<String, MomentStatsItem> getStatsItemTable() { + return statsItemTable; + } + + public String getStatsName() { + return statsName; + } + + public void init() { + + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + printAtMinutes(); + } catch (Throwable e) { + } + } + }, Math.abs(UtilAll.computNextMinutesTimeMillis() - System.currentTimeMillis()), 1000 * 60 * 5, TimeUnit.MILLISECONDS); + } + + private void printAtMinutes() { + Iterator<Entry<String, MomentStatsItem>> it = this.statsItemTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<String, MomentStatsItem> next = it.next(); + next.getValue().printAtMinutes(); + } + } + + public void setValue(final String statsKey, final int value) { + MomentStatsItem statsItem = this.getAndCreateStatsItem(statsKey); + statsItem.getValue().set(value); + } + + public MomentStatsItem getAndCreateStatsItem(final String statsKey) { + MomentStatsItem statsItem = this.statsItemTable.get(statsKey); + if (null == statsItem) { + statsItem = + new MomentStatsItem(this.statsName, statsKey, this.scheduledExecutorService, this.log); + MomentStatsItem prev = this.statsItemTable.put(statsKey, statsItem); + + if (null == prev) { + + // statsItem.init(); + } + } + + return statsItem; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/StatsItem.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/StatsItem.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/StatsItem.java new file mode 100644 index 0000000..1c99699 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/StatsItem.java @@ -0,0 +1,272 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.stats; + +import com.alibaba.rocketmq.common.UtilAll; +import org.slf4j.Logger; + +import java.util.LinkedList; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + + +public class StatsItem { + + private final AtomicLong value = new AtomicLong(0); + + private final AtomicLong times = new AtomicLong(0); + + private final LinkedList<CallSnapshot> csListMinute = new LinkedList<CallSnapshot>(); + + + private final LinkedList<CallSnapshot> csListHour = new LinkedList<CallSnapshot>(); + + + private final LinkedList<CallSnapshot> csListDay = new LinkedList<CallSnapshot>(); + + private final String statsName; + private final String statsKey; + private final ScheduledExecutorService scheduledExecutorService; + private final Logger log; + + + public StatsItem(String statsName, String statsKey, ScheduledExecutorService scheduledExecutorService, + Logger log) { + this.statsName = statsName; + this.statsKey = statsKey; + this.scheduledExecutorService = scheduledExecutorService; + this.log = log; + } + + public StatsSnapshot getStatsDataInMinute() { + return computeStatsData(this.csListMinute); + } + + private static StatsSnapshot computeStatsData(final LinkedList<CallSnapshot> csList) { + StatsSnapshot statsSnapshot = new StatsSnapshot(); + synchronized (csList) { + double tps = 0; + double avgpt = 0; + long sum = 0; + if (!csList.isEmpty()) { + CallSnapshot first = csList.getFirst(); + CallSnapshot last = csList.getLast(); + sum = last.getValue() - first.getValue(); + tps = (sum * 1000.0d) / (last.getTimestamp() - first.getTimestamp()); + + long timesDiff = last.getTimes() - first.getTimes(); + if (timesDiff > 0) { + avgpt = (sum * 1.0d) / timesDiff; + } + } + + statsSnapshot.setSum(sum); + statsSnapshot.setTps(tps); + statsSnapshot.setAvgpt(avgpt); + } + + return statsSnapshot; + } + + public StatsSnapshot getStatsDataInHour() { + return computeStatsData(this.csListHour); + } + + public StatsSnapshot getStatsDataInDay() { + return computeStatsData(this.csListDay); + } + + public void init() { + + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + samplingInSeconds(); + } catch (Throwable e) { + } + } + }, 0, 10, TimeUnit.SECONDS); + + + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + samplingInMinutes(); + } catch (Throwable e) { + } + } + }, 0, 10, TimeUnit.MINUTES); + + + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + samplingInHour(); + } catch (Throwable e) { + } + } + }, 0, 1, TimeUnit.HOURS); + + + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + printAtMinutes(); + } catch (Throwable ignored) { + } + } + }, Math.abs(UtilAll.computNextMinutesTimeMillis() - System.currentTimeMillis()), 1000 * 60, TimeUnit.MILLISECONDS); + + + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + printAtHour(); + } catch (Throwable ignored) { + } + } + }, Math.abs(UtilAll.computNextHourTimeMillis() - System.currentTimeMillis()), 1000 * 60 * 60, TimeUnit.MILLISECONDS); + + + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + printAtDay(); + } catch (Throwable ignored) { + } + } + }, Math.abs(UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis()) - 2000, 1000 * 60 * 60 * 24, TimeUnit.MILLISECONDS); + } + + public void samplingInSeconds() { + synchronized (this.csListMinute) { + this.csListMinute.add(new CallSnapshot(System.currentTimeMillis(), this.times.get(), this.value + .get())); + if (this.csListMinute.size() > 7) { + this.csListMinute.removeFirst(); + } + } + } + + public void samplingInMinutes() { + synchronized (this.csListHour) { + this.csListHour.add(new CallSnapshot(System.currentTimeMillis(), this.times.get(), this.value + .get())); + if (this.csListHour.size() > 7) { + this.csListHour.removeFirst(); + } + } + } + + public void samplingInHour() { + synchronized (this.csListDay) { + this.csListDay.add(new CallSnapshot(System.currentTimeMillis(), this.times.get(), this.value + .get())); + if (this.csListDay.size() > 25) { + this.csListDay.removeFirst(); + } + } + } + + public void printAtMinutes() { + StatsSnapshot ss = computeStatsData(this.csListMinute); + log.info(String.format("[%s] [%s] Stats In One Minute, SUM: %d TPS: %.2f AVGPT: %.2f", + this.statsName, + this.statsKey, + ss.getSum(), + ss.getTps(), + ss.getAvgpt())); + } + + public void printAtHour() { + StatsSnapshot ss = computeStatsData(this.csListHour); + log.info(String.format("[%s] [%s] Stats In One Hour, SUM: %d TPS: %.2f AVGPT: %.2f", + this.statsName, + this.statsKey, + ss.getSum(), + ss.getTps(), + ss.getAvgpt())); + } + + public void printAtDay() { + StatsSnapshot ss = computeStatsData(this.csListDay); + log.info(String.format("[%s] [%s] Stats In One Day, SUM: %d TPS: %.2f AVGPT: %.2f", + this.statsName, + this.statsKey, + ss.getSum(), + ss.getTps(), + ss.getAvgpt())); + } + + public AtomicLong getValue() { + return value; + } + + + public String getStatsKey() { + return statsKey; + } + + + public String getStatsName() { + return statsName; + } + + + public AtomicLong getTimes() { + return times; + } +} + + +class CallSnapshot { + private final long timestamp; + private final long times; + + private final long value; + + + public CallSnapshot(long timestamp, long times, long value) { + super(); + this.timestamp = timestamp; + this.times = times; + this.value = value; + } + + + public long getTimestamp() { + return timestamp; + } + + + public long getTimes() { + return times; + } + + + public long getValue() { + return value; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/StatsItemSet.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/StatsItemSet.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/StatsItemSet.java new file mode 100644 index 0000000..8a2b2a1 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/StatsItemSet.java @@ -0,0 +1,210 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.stats; + +import com.alibaba.rocketmq.common.UtilAll; +import org.slf4j.Logger; + +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + + +public class StatsItemSet { + private final ConcurrentHashMap<String/* key */, StatsItem> statsItemTable = + new ConcurrentHashMap<String, StatsItem>(128); + + private final String statsName; + private final ScheduledExecutorService scheduledExecutorService; + private final Logger log; + + + public StatsItemSet(String statsName, ScheduledExecutorService scheduledExecutorService, Logger log) { + this.statsName = statsName; + this.scheduledExecutorService = scheduledExecutorService; + this.log = log; + this.init(); + } + + public void init() { + + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + samplingInSeconds(); + } catch (Throwable e) { + } + } + }, 0, 10, TimeUnit.SECONDS); + + + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + samplingInMinutes(); + } catch (Throwable e) { + } + } + }, 0, 10, TimeUnit.MINUTES); + + + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + samplingInHour(); + } catch (Throwable e) { + } + } + }, 0, 1, TimeUnit.HOURS); + + + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + printAtMinutes(); + } catch (Throwable e) { + } + } + }, Math.abs(UtilAll.computNextMinutesTimeMillis() - System.currentTimeMillis()), 1000 * 60, TimeUnit.MILLISECONDS); + + + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + printAtHour(); + } catch (Throwable e) { + } + } + }, Math.abs(UtilAll.computNextHourTimeMillis() - System.currentTimeMillis()), 1000 * 60 * 60, TimeUnit.MILLISECONDS); + + + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + printAtDay(); + } catch (Throwable e) { + } + } + }, Math.abs(UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis()), 1000 * 60 * 60 * 24, TimeUnit.MILLISECONDS); + } + + private void samplingInSeconds() { + Iterator<Entry<String, StatsItem>> it = this.statsItemTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<String, StatsItem> next = it.next(); + next.getValue().samplingInSeconds(); + } + } + + private void samplingInMinutes() { + Iterator<Entry<String, StatsItem>> it = this.statsItemTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<String, StatsItem> next = it.next(); + next.getValue().samplingInMinutes(); + } + } + + private void samplingInHour() { + Iterator<Entry<String, StatsItem>> it = this.statsItemTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<String, StatsItem> next = it.next(); + next.getValue().samplingInHour(); + } + } + + private void printAtMinutes() { + Iterator<Entry<String, StatsItem>> it = this.statsItemTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<String, StatsItem> next = it.next(); + next.getValue().printAtMinutes(); + } + } + + private void printAtHour() { + Iterator<Entry<String, StatsItem>> it = this.statsItemTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<String, StatsItem> next = it.next(); + next.getValue().printAtHour(); + } + } + + private void printAtDay() { + Iterator<Entry<String, StatsItem>> it = this.statsItemTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<String, StatsItem> next = it.next(); + next.getValue().printAtDay(); + } + } + + public void addValue(final String statsKey, final int incValue, final int incTimes) { + StatsItem statsItem = this.getAndCreateStatsItem(statsKey); + statsItem.getValue().addAndGet(incValue); + statsItem.getTimes().addAndGet(incTimes); + } + + public StatsItem getAndCreateStatsItem(final String statsKey) { + StatsItem statsItem = this.statsItemTable.get(statsKey); + if (null == statsItem) { + statsItem = new StatsItem(this.statsName, statsKey, this.scheduledExecutorService, this.log); + StatsItem prev = this.statsItemTable.put(statsKey, statsItem); + + if (null == prev) { + + // statsItem.init(); + } + } + + return statsItem; + } + + public StatsSnapshot getStatsDataInMinute(final String statsKey) { + StatsItem statsItem = this.statsItemTable.get(statsKey); + if (null != statsItem) { + return statsItem.getStatsDataInMinute(); + } + return new StatsSnapshot(); + } + + public StatsSnapshot getStatsDataInHour(final String statsKey) { + StatsItem statsItem = this.statsItemTable.get(statsKey); + if (null != statsItem) { + return statsItem.getStatsDataInHour(); + } + return new StatsSnapshot(); + } + + public StatsSnapshot getStatsDataInDay(final String statsKey) { + StatsItem statsItem = this.statsItemTable.get(statsKey); + if (null != statsItem) { + return statsItem.getStatsDataInDay(); + } + return new StatsSnapshot(); + } + + public StatsItem getStatsItem(final String statsKey) { + return this.statsItemTable.get(statsKey); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/StatsSnapshot.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/StatsSnapshot.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/StatsSnapshot.java new file mode 100644 index 0000000..4092a2b --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/StatsSnapshot.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.stats; + +public class StatsSnapshot { + private long sum; + private double tps; + private double avgpt; + + + public long getSum() { + return sum; + } + + + public void setSum(long sum) { + this.sum = sum; + } + + + public double getTps() { + return tps; + } + + + public void setTps(double tps) { + this.tps = tps; + } + + + public double getAvgpt() { + return avgpt; + } + + + public void setAvgpt(double avgpt) { + this.avgpt = avgpt; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/subscription/SubscriptionGroupConfig.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/subscription/SubscriptionGroupConfig.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/subscription/SubscriptionGroupConfig.java new file mode 100644 index 0000000..cf8baf2 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/subscription/SubscriptionGroupConfig.java @@ -0,0 +1,194 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.subscription; + +import com.alibaba.rocketmq.common.MixAll; + + +/** + * @author shijia.wxr + */ +public class SubscriptionGroupConfig { + + private String groupName; + + private boolean consumeEnable = true; + private boolean consumeFromMinEnable = true; + + private boolean consumeBroadcastEnable = true; + + private int retryQueueNums = 1; + + private int retryMaxTimes = 16; + + private long brokerId = MixAll.MASTER_ID; + + private long whichBrokerWhenConsumeSlowly = 1; + + private boolean notifyConsumerIdsChangedEnable = true; + + + public String getGroupName() { + return groupName; + } + + + public void setGroupName(String groupName) { + this.groupName = groupName; + } + + + public boolean isConsumeEnable() { + return consumeEnable; + } + + + public void setConsumeEnable(boolean consumeEnable) { + this.consumeEnable = consumeEnable; + } + + + public boolean isConsumeFromMinEnable() { + return consumeFromMinEnable; + } + + + public void setConsumeFromMinEnable(boolean consumeFromMinEnable) { + this.consumeFromMinEnable = consumeFromMinEnable; + } + + + public boolean isConsumeBroadcastEnable() { + return consumeBroadcastEnable; + } + + + public void setConsumeBroadcastEnable(boolean consumeBroadcastEnable) { + this.consumeBroadcastEnable = consumeBroadcastEnable; + } + + + public int getRetryQueueNums() { + return retryQueueNums; + } + + + public void setRetryQueueNums(int retryQueueNums) { + this.retryQueueNums = retryQueueNums; + } + + + public int getRetryMaxTimes() { + return retryMaxTimes; + } + + + public void setRetryMaxTimes(int retryMaxTimes) { + this.retryMaxTimes = retryMaxTimes; + } + + + public long getBrokerId() { + return brokerId; + } + + + public void setBrokerId(long brokerId) { + this.brokerId = brokerId; + } + + + public long getWhichBrokerWhenConsumeSlowly() { + return whichBrokerWhenConsumeSlowly; + } + + + public void setWhichBrokerWhenConsumeSlowly(long whichBrokerWhenConsumeSlowly) { + this.whichBrokerWhenConsumeSlowly = whichBrokerWhenConsumeSlowly; + } + + public boolean isNotifyConsumerIdsChangedEnable() { + return notifyConsumerIdsChangedEnable; + } + + public void setNotifyConsumerIdsChangedEnable(final boolean notifyConsumerIdsChangedEnable) { + this.notifyConsumerIdsChangedEnable = notifyConsumerIdsChangedEnable; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + (int) (brokerId ^ (brokerId >>> 32)); + result = prime * result + (consumeBroadcastEnable ? 1231 : 1237); + result = prime * result + (consumeEnable ? 1231 : 1237); + result = prime * result + (consumeFromMinEnable ? 1231 : 1237); + result = prime * result + (notifyConsumerIdsChangedEnable ? 1231 : 1237); + result = prime * result + ((groupName == null) ? 0 : groupName.hashCode()); + result = prime * result + retryMaxTimes; + result = prime * result + retryQueueNums; + result = + prime * result + (int) (whichBrokerWhenConsumeSlowly ^ (whichBrokerWhenConsumeSlowly >>> 32)); + return result; + } + + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + SubscriptionGroupConfig other = (SubscriptionGroupConfig) obj; + if (brokerId != other.brokerId) + return false; + if (consumeBroadcastEnable != other.consumeBroadcastEnable) + return false; + if (consumeEnable != other.consumeEnable) + return false; + if (consumeFromMinEnable != other.consumeFromMinEnable) + return false; + if (groupName == null) { + if (other.groupName != null) + return false; + } else if (!groupName.equals(other.groupName)) + return false; + if (retryMaxTimes != other.retryMaxTimes) + return false; + if (retryQueueNums != other.retryQueueNums) + return false; + if (whichBrokerWhenConsumeSlowly != other.whichBrokerWhenConsumeSlowly) + return false; + if (notifyConsumerIdsChangedEnable != other.notifyConsumerIdsChangedEnable) + return false; + return true; + } + + + @Override + public String toString() { + return "SubscriptionGroupConfig [groupName=" + groupName + ", consumeEnable=" + consumeEnable + + ", consumeFromMinEnable=" + consumeFromMinEnable + ", consumeBroadcastEnable=" + + consumeBroadcastEnable + ", retryQueueNums=" + retryQueueNums + ", retryMaxTimes=" + + retryMaxTimes + ", brokerId=" + brokerId + ", whichBrokerWhenConsumeSlowly=" + + whichBrokerWhenConsumeSlowly + ", notifyConsumerIdsChangedEnable=" + + notifyConsumerIdsChangedEnable + "]"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/sysflag/MessageSysFlag.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/sysflag/MessageSysFlag.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/sysflag/MessageSysFlag.java new file mode 100644 index 0000000..2f9d057 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/sysflag/MessageSysFlag.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.common.sysflag; + +/** + * @author shijia.wxr + */ +public class MessageSysFlag { + public final static int COMPRESSED_FLAG = 0x1; + public final static int MULTI_TAGS_FLAG = 0x1 << 1; + public final static int TRANSACTION_NOT_TYPE = 0; + public final static int TRANSACTION_PREPARED_TYPE = 0x1 << 2; + public final static int TRANSACTION_COMMIT_TYPE = 0x2 << 2; + public final static int TRANSACTION_ROLLBACK_TYPE = 0x3 << 2; + + + public static int getTransactionValue(final int flag) { + return flag & TRANSACTION_ROLLBACK_TYPE; + } + + + public static int resetTransactionValue(final int flag, final int type) { + return (flag & (~TRANSACTION_ROLLBACK_TYPE)) | type; + } + + + public static int clearCompressedFlag(final int flag) { + return flag & (~COMPRESSED_FLAG); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/sysflag/PullSysFlag.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/sysflag/PullSysFlag.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/sysflag/PullSysFlag.java new file mode 100644 index 0000000..d0f7287 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/sysflag/PullSysFlag.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.common.sysflag; + +/** + * @author shijia.wxr + */ +public class PullSysFlag { + private final static int FLAG_COMMIT_OFFSET = 0x1 << 0; + private final static int FLAG_SUSPEND = 0x1 << 1; + private final static int FLAG_SUBSCRIPTION = 0x1 << 2; + private final static int FLAG_CLASS_FILTER = 0x1 << 3; + + + public static int buildSysFlag(final boolean commitOffset, final boolean suspend, + final boolean subscription, final boolean classFilter) { + int flag = 0; + + if (commitOffset) { + flag |= FLAG_COMMIT_OFFSET; + } + + if (suspend) { + flag |= FLAG_SUSPEND; + } + + if (subscription) { + flag |= FLAG_SUBSCRIPTION; + } + + if (classFilter) { + flag |= FLAG_CLASS_FILTER; + } + + return flag; + } + + + public static int clearCommitOffsetFlag(final int sysFlag) { + return sysFlag & (~FLAG_COMMIT_OFFSET); + } + + + public static boolean hasCommitOffsetFlag(final int sysFlag) { + return (sysFlag & FLAG_COMMIT_OFFSET) == FLAG_COMMIT_OFFSET; + } + + + public static boolean hasSuspendFlag(final int sysFlag) { + return (sysFlag & FLAG_SUSPEND) == FLAG_SUSPEND; + } + + + public static boolean hasSubscriptionFlag(final int sysFlag) { + return (sysFlag & FLAG_SUBSCRIPTION) == FLAG_SUBSCRIPTION; + } + + + public static boolean hasClassFilterFlag(final int sysFlag) { + return (sysFlag & FLAG_CLASS_FILTER) == FLAG_CLASS_FILTER; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/sysflag/SubscriptionSysFlag.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/sysflag/SubscriptionSysFlag.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/sysflag/SubscriptionSysFlag.java new file mode 100644 index 0000000..65e3115 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/sysflag/SubscriptionSysFlag.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.common.sysflag; + +/** + * @author manhong.yqd + */ +public class SubscriptionSysFlag { + + private final static int FLAG_UNIT = 0x1 << 0; + + + public static int buildSysFlag(final boolean unit) { + int sysFlag = 0; + + if (unit) { + sysFlag |= FLAG_UNIT; + } + + return sysFlag; + } + + + public static int setUnitFlag(final int sysFlag) { + return sysFlag | FLAG_UNIT; + } + + + public static int clearUnitFlag(final int sysFlag) { + return sysFlag & (~FLAG_UNIT); + } + + + public static boolean hasUnitFlag(final int sysFlag) { + return (sysFlag & FLAG_UNIT) == FLAG_UNIT; + } + + + public static void main(String[] args) { + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/sysflag/TopicSysFlag.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/sysflag/TopicSysFlag.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/sysflag/TopicSysFlag.java new file mode 100644 index 0000000..90d48f4 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/sysflag/TopicSysFlag.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.common.sysflag; + +/** + + * + * @author manhong.yqd + * + */ +public class TopicSysFlag { + + private final static int FLAG_UNIT = 0x1 << 0; + + private final static int FLAG_UNIT_SUB = 0x1 << 1; + + + public static int buildSysFlag(final boolean unit, final boolean hasUnitSub) { + int sysFlag = 0; + + if (unit) { + sysFlag |= FLAG_UNIT; + } + + if (hasUnitSub) { + sysFlag |= FLAG_UNIT_SUB; + } + + return sysFlag; + } + + + public static int setUnitFlag(final int sysFlag) { + return sysFlag | FLAG_UNIT; + } + + + public static int clearUnitFlag(final int sysFlag) { + return sysFlag & (~FLAG_UNIT); + } + + + public static boolean hasUnitFlag(final int sysFlag) { + return (sysFlag & FLAG_UNIT) == FLAG_UNIT; + } + + + public static int setUnitSubFlag(final int sysFlag) { + return sysFlag | FLAG_UNIT_SUB; + } + + + public static int clearUnitSubFlag(final int sysFlag) { + return sysFlag & (~FLAG_UNIT_SUB); + } + + + public static boolean hasUnitSubFlag(final int sysFlag) { + return (sysFlag & FLAG_UNIT_SUB) == FLAG_UNIT_SUB; + } + + + public static void main(String[] args) { + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/utils/ChannelUtil.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/utils/ChannelUtil.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/utils/ChannelUtil.java new file mode 100644 index 0000000..444928f --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/utils/ChannelUtil.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.utils; + +import io.netty.channel.Channel; + +import java.net.InetAddress; +import java.net.InetSocketAddress; + +public class ChannelUtil { + public static String getRemoteIp(Channel channel) { + InetSocketAddress inetSocketAddress = (InetSocketAddress) channel.remoteAddress(); + if (inetSocketAddress == null) { + return ""; + } + final InetAddress inetAddr = inetSocketAddress.getAddress(); + return inetAddr != null ? inetAddr.getHostAddress() : inetSocketAddress.getHostName(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/utils/HttpTinyClient.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/utils/HttpTinyClient.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/utils/HttpTinyClient.java new file mode 100755 index 0000000..dadac46 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/utils/HttpTinyClient.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.utils; + +import com.alibaba.rocketmq.common.MQVersion; +import com.alibaba.rocketmq.common.MixAll; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.HttpURLConnection; +import java.net.URL; +import java.net.URLEncoder; +import java.util.Iterator; +import java.util.List; + + +public class HttpTinyClient { + + static public HttpResult httpGet(String url, List<String> headers, List<String> paramValues, + String encoding, long readTimeoutMs) throws IOException { + String encodedContent = encodingParams(paramValues, encoding); + url += (null == encodedContent) ? "" : ("?" + encodedContent); + + HttpURLConnection conn = null; + try { + conn = (HttpURLConnection) new URL(url).openConnection(); + conn.setRequestMethod("GET"); + conn.setConnectTimeout((int) readTimeoutMs); + conn.setReadTimeout((int) readTimeoutMs); + setHeaders(conn, headers, encoding); + + conn.connect(); + int respCode = conn.getResponseCode(); + String resp = null; + + if (HttpURLConnection.HTTP_OK == respCode) { + resp = IOTinyUtils.toString(conn.getInputStream(), encoding); + } else { + resp = IOTinyUtils.toString(conn.getErrorStream(), encoding); + } + return new HttpResult(respCode, resp); + } finally { + if (conn != null) { + conn.disconnect(); + } + } + } + + static private String encodingParams(List<String> paramValues, String encoding) + throws UnsupportedEncodingException { + StringBuilder sb = new StringBuilder(); + if (null == paramValues) { + return null; + } + + for (Iterator<String> iter = paramValues.iterator(); iter.hasNext(); ) { + sb.append(iter.next()).append("="); + sb.append(URLEncoder.encode(iter.next(), encoding)); + if (iter.hasNext()) { + sb.append("&"); + } + } + return sb.toString(); + } + + static private void setHeaders(HttpURLConnection conn, List<String> headers, String encoding) { + if (null != headers) { + for (Iterator<String> iter = headers.iterator(); iter.hasNext(); ) { + conn.addRequestProperty(iter.next(), iter.next()); + } + } + conn.addRequestProperty("Client-Version", MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION)); + conn.addRequestProperty("Content-Type", "application/x-www-form-urlencoded;charset=" + encoding); + + + String ts = String.valueOf(System.currentTimeMillis()); + conn.addRequestProperty("Metaq-Client-RequestTS", ts); + } + + /** + + * + * @param url + * @param headers + + * @param paramValues + + * @param encoding + + * @param readTimeoutMs + + * + * @return the http response of given http post request + * + * @throws java.io.IOException + */ + static public HttpResult httpPost(String url, List<String> headers, List<String> paramValues, + String encoding, long readTimeoutMs) throws IOException { + String encodedContent = encodingParams(paramValues, encoding); + + HttpURLConnection conn = null; + try { + conn = (HttpURLConnection) new URL(url).openConnection(); + conn.setRequestMethod("POST"); + conn.setConnectTimeout(3000); + conn.setReadTimeout((int) readTimeoutMs); + conn.setDoOutput(true); + conn.setDoInput(true); + setHeaders(conn, headers, encoding); + + conn.getOutputStream().write(encodedContent.getBytes(MixAll.DEFAULT_CHARSET)); + + int respCode = conn.getResponseCode(); + String resp = null; + + if (HttpURLConnection.HTTP_OK == respCode) { + resp = IOTinyUtils.toString(conn.getInputStream(), encoding); + } else { + resp = IOTinyUtils.toString(conn.getErrorStream(), encoding); + } + return new HttpResult(respCode, resp); + } finally { + if (null != conn) { + conn.disconnect(); + } + } + } + + static public class HttpResult { + final public int code; + final public String content; + + + public HttpResult(int code, String content) { + this.code = code; + this.content = content; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/utils/IOTinyUtils.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/utils/IOTinyUtils.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/utils/IOTinyUtils.java new file mode 100644 index 0000000..ced2fae --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/utils/IOTinyUtils.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.utils; + +import com.alibaba.rocketmq.remoting.common.RemotingHelper; + +import java.io.*; +import java.nio.channels.FileChannel; +import java.util.ArrayList; +import java.util.List; + + +/** + * @author manhong.yqd + */ +public class IOTinyUtils { + + static public String toString(InputStream input, String encoding) throws IOException { + return (null == encoding) ? toString(new InputStreamReader(input, RemotingHelper.DEFAULT_CHARSET)) : toString(new InputStreamReader( + input, encoding)); + } + + + static public String toString(Reader reader) throws IOException { + CharArrayWriter sw = new CharArrayWriter(); + copy(reader, sw); + return sw.toString(); + } + + + static public long copy(Reader input, Writer output) throws IOException { + char[] buffer = new char[1 << 12]; + long count = 0; + for (int n = 0; (n = input.read(buffer)) >= 0; ) { + output.write(buffer, 0, n); + count += n; + } + return count; + } + + + /** + + */ + static public List<String> readLines(Reader input) throws IOException { + BufferedReader reader = toBufferedReader(input); + List<String> list = new ArrayList<String>(); + String line = null; + for (;;) { + line = reader.readLine(); + if (null != line) { + list.add(line); + } else { + break; + } + } + return list; + } + + + static private BufferedReader toBufferedReader(Reader reader) { + return reader instanceof BufferedReader ? (BufferedReader) reader : new BufferedReader(reader); + } + + + static public void copyFile(String source, String target) throws IOException { + File sf = new File(source); + if (!sf.exists()) { + throw new IllegalArgumentException("source file does not exist."); + } + File tf = new File(target); + tf.getParentFile().mkdirs(); + if (!tf.exists() && !tf.createNewFile()) { + throw new RuntimeException("failed to create target file."); + } + + FileChannel sc = null; + FileChannel tc = null; + try { + tc = new FileOutputStream(tf).getChannel(); + sc = new FileInputStream(sf).getChannel(); + sc.transferTo(0, sc.size(), tc); + } finally { + if (null != sc) { + sc.close(); + } + if (null != tc) { + tc.close(); + } + } + } + + + public static void delete(File fileOrDir) throws IOException { + if (fileOrDir == null) { + return; + } + + if (fileOrDir.isDirectory()) { + cleanDirectory(fileOrDir); + } + + fileOrDir.delete(); + } + + + /** + + */ + public static void cleanDirectory(File directory) throws IOException { + if (!directory.exists()) { + String message = directory + " does not exist"; + throw new IllegalArgumentException(message); + } + + if (!directory.isDirectory()) { + String message = directory + " is not a directory"; + throw new IllegalArgumentException(message); + } + + File[] files = directory.listFiles(); + if (files == null) { // null if security restricted + throw new IOException("Failed to list contents of " + directory); + } + + IOException exception = null; + for (File file : files) { + try { + delete(file); + } catch (IOException ioe) { + exception = ioe; + } + } + + if (null != exception) { + throw exception; + } + } + + + public static void writeStringToFile(File file, String data, String encoding) throws IOException { + OutputStream os = null; + try { + os = new FileOutputStream(file); + os.write(data.getBytes(encoding)); + } finally { + if (null != os) { + os.close(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/MixAllTest.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/MixAllTest.java b/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/MixAllTest.java new file mode 100644 index 0000000..72e02d0 --- /dev/null +++ b/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/MixAllTest.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common; + +import junit.framework.Assert; +import org.junit.Test; + +import java.net.InetAddress; +import java.util.List; + + +/** + * @author lansheng.zj + */ +public class MixAllTest { + + @Test + public void test() throws Exception { + List<String> localInetAddress = MixAll.getLocalInetAddress(); + String local = InetAddress.getLocalHost().getHostAddress(); + Assert.assertTrue(localInetAddress.contains("127.0.0.1")); + Assert.assertTrue(localInetAddress.contains(local)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/RemotingUtilTest.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/RemotingUtilTest.java b/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/RemotingUtilTest.java new file mode 100644 index 0000000..e6468b9 --- /dev/null +++ b/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/RemotingUtilTest.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.common; + +import com.alibaba.rocketmq.remoting.common.RemotingUtil; +import org.junit.Test; + + +public class RemotingUtilTest { + @Test + public void test() throws Exception { + String a = RemotingUtil.getLocalAddress(); + System.out.println(a); + } +}
