http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/message/MessageExt.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageExt.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageExt.java new file mode 100644 index 0000000..db50672 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageExt.java @@ -0,0 +1,238 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.message; + +import org.apache.rocketmq.common.TopicFilterType; +import org.apache.rocketmq.common.sysflag.MessageSysFlag; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; + + +/** + * @author shijia.wxr + */ +public class MessageExt extends Message { + private static final long serialVersionUID = 5720810158625748049L; + + private int queueId; + + private int storeSize; + + private long queueOffset; + private int sysFlag; + private long bornTimestamp; + private SocketAddress bornHost; + + private long storeTimestamp; + private SocketAddress storeHost; + private String msgId; + private long commitLogOffset; + private int bodyCRC; + private int reconsumeTimes; + + private long preparedTransactionOffset; + + + public MessageExt() { + } + + + public MessageExt(int queueId, long bornTimestamp, SocketAddress bornHost, long storeTimestamp, + SocketAddress storeHost, String msgId) { + this.queueId = queueId; + this.bornTimestamp = bornTimestamp; + this.bornHost = bornHost; + this.storeTimestamp = storeTimestamp; + this.storeHost = storeHost; + this.msgId = msgId; + } + + public static TopicFilterType parseTopicFilterType(final int sysFlag) { + if ((sysFlag & MessageSysFlag.MULTI_TAGS_FLAG) == MessageSysFlag.MULTI_TAGS_FLAG) { + return TopicFilterType.MULTI_TAG; + } + + return TopicFilterType.SINGLE_TAG; + } + + public ByteBuffer getBornHostBytes() { + return socketAddress2ByteBuffer(this.bornHost); + } + + public ByteBuffer getBornHostBytes(ByteBuffer byteBuffer) { + return socketAddress2ByteBuffer(this.bornHost, byteBuffer); + } + + private static ByteBuffer socketAddress2ByteBuffer(final SocketAddress socketAddress, final ByteBuffer byteBuffer) { + InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress; + byteBuffer.put(inetSocketAddress.getAddress().getAddress(), 0, 4); + byteBuffer.putInt(inetSocketAddress.getPort()); + byteBuffer.flip(); + return byteBuffer; + } + + public static ByteBuffer socketAddress2ByteBuffer(SocketAddress socketAddress) { + ByteBuffer byteBuffer = ByteBuffer.allocate(8); + return socketAddress2ByteBuffer(socketAddress, byteBuffer); + } + + public ByteBuffer getStoreHostBytes() { + return socketAddress2ByteBuffer(this.storeHost); + } + + public ByteBuffer getStoreHostBytes(ByteBuffer byteBuffer) { + return socketAddress2ByteBuffer(this.storeHost, byteBuffer); + } + + public int getQueueId() { + return queueId; + } + + public void setQueueId(int queueId) { + this.queueId = queueId; + } + + public long getBornTimestamp() { + return bornTimestamp; + } + + public void setBornTimestamp(long bornTimestamp) { + this.bornTimestamp = bornTimestamp; + } + + public SocketAddress getBornHost() { + return bornHost; + } + + public void setBornHost(SocketAddress bornHost) { + this.bornHost = bornHost; + } + + public String getBornHostString() { + if (this.bornHost != null) { + InetSocketAddress inetSocketAddress = (InetSocketAddress) this.bornHost; + return inetSocketAddress.getAddress().getHostAddress(); + } + + return null; + } + + public String getBornHostNameString() { + if (this.bornHost != null) { + InetSocketAddress inetSocketAddress = (InetSocketAddress) this.bornHost; + return inetSocketAddress.getAddress().getHostName(); + } + + return null; + } + + public long getStoreTimestamp() { + return storeTimestamp; + } + + public void setStoreTimestamp(long storeTimestamp) { + this.storeTimestamp = storeTimestamp; + } + + public SocketAddress getStoreHost() { + return storeHost; + } + + public void setStoreHost(SocketAddress storeHost) { + this.storeHost = storeHost; + } + + public String getMsgId() { + return msgId; + } + + public void setMsgId(String msgId) { + this.msgId = msgId; + } + + public int getSysFlag() { + return sysFlag; + } + + public void setSysFlag(int sysFlag) { + this.sysFlag = sysFlag; + } + + public int getBodyCRC() { + return bodyCRC; + } + + public void setBodyCRC(int bodyCRC) { + this.bodyCRC = bodyCRC; + } + + public long getQueueOffset() { + return queueOffset; + } + + public void setQueueOffset(long queueOffset) { + this.queueOffset = queueOffset; + } + + public long getCommitLogOffset() { + return commitLogOffset; + } + + public void setCommitLogOffset(long physicOffset) { + this.commitLogOffset = physicOffset; + } + + public int getStoreSize() { + return storeSize; + } + + public void setStoreSize(int storeSize) { + this.storeSize = storeSize; + } + + public int getReconsumeTimes() { + return reconsumeTimes; + } + + + public void setReconsumeTimes(int reconsumeTimes) { + this.reconsumeTimes = reconsumeTimes; + } + + + public long getPreparedTransactionOffset() { + return preparedTransactionOffset; + } + + + public void setPreparedTransactionOffset(long preparedTransactionOffset) { + this.preparedTransactionOffset = preparedTransactionOffset; + } + + + @Override + public String toString() { + return "MessageExt [queueId=" + queueId + ", storeSize=" + storeSize + ", queueOffset=" + queueOffset + + ", sysFlag=" + sysFlag + ", bornTimestamp=" + bornTimestamp + ", bornHost=" + bornHost + + ", storeTimestamp=" + storeTimestamp + ", storeHost=" + storeHost + ", msgId=" + msgId + + ", commitLogOffset=" + commitLogOffset + ", bodyCRC=" + bodyCRC + ", reconsumeTimes=" + + reconsumeTimes + ", preparedTransactionOffset=" + preparedTransactionOffset + + ", toString()=" + super.toString() + "]"; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/message/MessageId.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageId.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageId.java new file mode 100644 index 0000000..95fe2f9 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageId.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.message; + +import java.net.SocketAddress; + + +/** + * @author shijia.wxr + */ +public class MessageId { + private SocketAddress address; + private long offset; + + + public MessageId(SocketAddress address, long offset) { + this.address = address; + this.offset = offset; + } + + + public SocketAddress getAddress() { + return address; + } + + + public void setAddress(SocketAddress address) { + this.address = address; + } + + + public long getOffset() { + return offset; + } + + + public void setOffset(long offset) { + this.offset = offset; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/message/MessageQueue.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageQueue.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageQueue.java new file mode 100644 index 0000000..3c341e6 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageQueue.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.message; + +import java.io.Serializable; + + +/** + * @author shijia.wxr + */ +public class MessageQueue implements Comparable<MessageQueue>, Serializable { + private static final long serialVersionUID = 6191200464116433425L; + private String topic; + private String brokerName; + private int queueId; + + + public MessageQueue() { + + } + + + public MessageQueue(String topic, String brokerName, int queueId) { + this.topic = topic; + this.brokerName = brokerName; + this.queueId = queueId; + } + + + public String getTopic() { + return topic; + } + + + public void setTopic(String topic) { + this.topic = topic; + } + + + public String getBrokerName() { + return brokerName; + } + + + public void setBrokerName(String brokerName) { + this.brokerName = brokerName; + } + + + public int getQueueId() { + return queueId; + } + + + public void setQueueId(int queueId) { + this.queueId = queueId; + } + + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((brokerName == null) ? 0 : brokerName.hashCode()); + result = prime * result + queueId; + result = prime * result + ((topic == null) ? 0 : topic.hashCode()); + return result; + } + + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + MessageQueue other = (MessageQueue) obj; + if (brokerName == null) { + if (other.brokerName != null) + return false; + } else if (!brokerName.equals(other.brokerName)) + return false; + if (queueId != other.queueId) + return false; + if (topic == null) { + if (other.topic != null) + return false; + } else if (!topic.equals(other.topic)) + return false; + return true; + } + + + @Override + public String toString() { + return "MessageQueue [topic=" + topic + ", brokerName=" + brokerName + ", queueId=" + queueId + "]"; + } + + + @Override + public int compareTo(MessageQueue o) { + { + int result = this.topic.compareTo(o.topic); + if (result != 0) { + return result; + } + } + + { + int result = this.brokerName.compareTo(o.brokerName); + if (result != 0) { + return result; + } + } + + return this.queueId - o.queueId; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/message/MessageQueueForC.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageQueueForC.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageQueueForC.java new file mode 100644 index 0000000..27b5e07 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageQueueForC.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.message; + +import java.io.Serializable; + + +/** + * @author lansheng.zj + */ +public class MessageQueueForC implements Comparable<MessageQueueForC>, Serializable { + + private static final long serialVersionUID = 5320967846569962104L; + private String topic; + private String brokerName; + private int queueId; + private long offset; + + + public MessageQueueForC(String topic, String brokerName, int queueId, long offset) { + this.topic = topic; + this.brokerName = brokerName; + this.queueId = queueId; + this.offset = offset; + } + + + @Override + public int compareTo(MessageQueueForC o) { + int result = this.topic.compareTo(o.topic); + if (result != 0) { + return result; + } + result = this.brokerName.compareTo(o.brokerName); + if (result != 0) { + return result; + } + result = this.queueId - o.queueId; + if (result != 0) { + return result; + } + if ((this.offset - o.offset) > 0) { + return 1; + } else if ((this.offset - o.offset) == 0) { + return 0; + } else { + return -1; + } + } + + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((brokerName == null) ? 0 : brokerName.hashCode()); + result = prime * result + queueId; + result = prime * result + ((topic == null) ? 0 : topic.hashCode()); + return result; + } + + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + MessageQueueForC other = (MessageQueueForC) obj; + if (brokerName == null) { + if (other.brokerName != null) + return false; + } else if (!brokerName.equals(other.brokerName)) + return false; + if (queueId != other.queueId) + return false; + if (topic == null) { + if (other.topic != null) + return false; + } else if (!topic.equals(other.topic)) + return false; + + if (offset != other.offset) { + return false; + } + return true; + } + + + @Override + public String toString() { + return "MessageQueueForC [topic=" + topic + ", brokerName=" + brokerName + ", queueId=" + queueId + + ", offset=" + offset + "]"; + } + + + public String getTopic() { + return topic; + } + + + public void setTopic(String topic) { + this.topic = topic; + } + + + public String getBrokerName() { + return brokerName; + } + + + public void setBrokerName(String brokerName) { + this.brokerName = brokerName; + } + + + public int getQueueId() { + return queueId; + } + + + public void setQueueId(int queueId) { + this.queueId = queueId; + } + + + public long getOffset() { + return offset; + } + + + public void setOffset(long offset) { + this.offset = offset; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/message/MessageType.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageType.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageType.java new file mode 100644 index 0000000..a1913a5 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageType.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.message; + +public enum MessageType { + Normal_Msg, + Trans_Msg_Half, + Trans_msg_Commit, + Delay_Msg, +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java b/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java new file mode 100644 index 0000000..d71e6b0 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * $Id: NamesrvConfig.java 1839 2013-05-16 02:12:02Z shijia.wxr $ + */ +package org.apache.rocketmq.common.namesrv; + +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.constant.LoggerName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; + + +/** + * + * @author shijia.wxr + * @author lansheng.zj + */ +public class NamesrvConfig { + private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); + private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)); + + private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json"; + private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties"; + private String productEnvName = "center"; + private boolean clusterTest = false; + private boolean orderMessageEnable = false; + + public boolean isOrderMessageEnable() { + return orderMessageEnable; + } + + public void setOrderMessageEnable(boolean orderMessageEnable) { + this.orderMessageEnable = orderMessageEnable; + } + + public String getRocketmqHome() { + return rocketmqHome; + } + + + public void setRocketmqHome(String rocketmqHome) { + this.rocketmqHome = rocketmqHome; + } + + + public String getKvConfigPath() { + return kvConfigPath; + } + + + public void setKvConfigPath(String kvConfigPath) { + this.kvConfigPath = kvConfigPath; + } + + + public String getProductEnvName() { + return productEnvName; + } + + + public void setProductEnvName(String productEnvName) { + this.productEnvName = productEnvName; + } + + + public boolean isClusterTest() { + return clusterTest; + } + + + public void setClusterTest(boolean clusterTest) { + this.clusterTest = clusterTest; + } + + public String getConfigStorePath() { + return configStorePath; + } + + public void setConfigStorePath(final String configStorePath) { + this.configStorePath = configStorePath; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvUtil.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvUtil.java b/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvUtil.java new file mode 100644 index 0000000..fb854f8 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvUtil.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.namesrv; + +/** + * @author shijia.wxr + */ +public class NamesrvUtil { + public static final String NAMESPACE_ORDER_TOPIC_CONFIG = "ORDER_TOPIC_CONFIG"; +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/namesrv/RegisterBrokerResult.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/namesrv/RegisterBrokerResult.java b/common/src/main/java/org/apache/rocketmq/common/namesrv/RegisterBrokerResult.java new file mode 100644 index 0000000..10c811e --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/namesrv/RegisterBrokerResult.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.namesrv; + +import org.apache.rocketmq.common.protocol.body.KVTable; + + +/** + * @author shijia.wxr + */ +public class RegisterBrokerResult { + private String haServerAddr; + private String masterAddr; + private KVTable kvTable; + + + public String getHaServerAddr() { + return haServerAddr; + } + + + public void setHaServerAddr(String haServerAddr) { + this.haServerAddr = haServerAddr; + } + + + public String getMasterAddr() { + return masterAddr; + } + + + public void setMasterAddr(String masterAddr) { + this.masterAddr = masterAddr; + } + + + public KVTable getKvTable() { + return kvTable; + } + + + public void setKvTable(KVTable kvTable) { + this.kvTable = kvTable; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/namesrv/TopAddressing.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/namesrv/TopAddressing.java b/common/src/main/java/org/apache/rocketmq/common/namesrv/TopAddressing.java new file mode 100644 index 0000000..5836c05 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/namesrv/TopAddressing.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * $Id: TopAddressing.java 1831 2013-05-16 01:39:51Z shijia.wxr $ + */ +package org.apache.rocketmq.common.namesrv; + +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.help.FAQUrl; +import org.apache.rocketmq.common.utils.HttpTinyClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + + +/** + * @author shijia.wxr + * @author manhong.yqd + */ +public class TopAddressing { + private static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); + private String nsAddr; + private String wsAddr; + private String unitName; + + + public TopAddressing(final String wsAddr) { + this(wsAddr, null); + } + + + public TopAddressing(final String wsAddr, final String unitName) { + this.wsAddr = wsAddr; + this.unitName = unitName; + } + + public final String fetchNSAddr() { + return fetchNSAddr(true, 3000); + } + + public final String fetchNSAddr(boolean verbose, long timeoutMills) { + String url = this.wsAddr; + try { + if (!UtilAll.isBlank(this.unitName)) { + url = url + "-" + this.unitName + "?nofix=1"; + } + HttpTinyClient.HttpResult result = HttpTinyClient.httpGet(url, null, null, "UTF-8", timeoutMills); + if (200 == result.code) { + String responseStr = result.content; + if (responseStr != null) { + return clearNewLine(responseStr); + } else { + log.error("fetch nameserver address is null"); + } + } else { + log.error("fetch nameserver address failed. statusCode={}", result.code); + } + } catch (IOException e) { + if (verbose) { + log.error("fetch name server address exception", e); + } + } + + if (verbose) { + String errorMsg = + "connect to " + url + " failed, maybe the domain name " + MixAll.WS_DOMAIN_NAME + " not bind in /etc/hosts"; + errorMsg += FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL); + + log.warn(errorMsg); + } + return null; + } + + private static String clearNewLine(final String str) { + String newString = str.trim(); + int index = newString.indexOf("\r"); + if (index != -1) { + return newString.substring(0, index); + } + + index = newString.indexOf("\n"); + if (index != -1) { + return newString.substring(0, index); + } + + return newString; + } + + public String getNsAddr() { + return nsAddr; + } + + + public void setNsAddr(String nsAddr) { + this.nsAddr = nsAddr; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/MQProtosHelper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/MQProtosHelper.java b/common/src/main/java/org/apache/rocketmq/common/protocol/MQProtosHelper.java new file mode 100644 index 0000000..44e2e4f --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/MQProtosHelper.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol; + +import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader; +import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; + + +/** + * @author shijia.wxr + */ +public class MQProtosHelper { + public static boolean registerBrokerToNameServer(final String nsaddr, final String brokerAddr, + final long timeoutMillis) { + RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader(); + requestHeader.setBrokerAddr(brokerAddr); + + RemotingCommand request = + RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader); + + try { + RemotingCommand response = RemotingHelper.invokeSync(nsaddr, request, timeoutMillis); + if (response != null) { + return ResponseCode.SUCCESS == response.getCode(); + } + } catch (RemotingConnectException e) { + e.printStackTrace(); + } catch (RemotingSendRequestException e) { + e.printStackTrace(); + } catch (RemotingTimeoutException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java new file mode 100644 index 0000000..d878726 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol; + +public class RequestCode { + + public static final int SEND_MESSAGE = 10; + + public static final int PULL_MESSAGE = 11; + + public static final int QUERY_MESSAGE = 12; + public static final int QUERY_BROKER_OFFSET = 13; + public static final int QUERY_CONSUMER_OFFSET = 14; + public static final int UPDATE_CONSUMER_OFFSET = 15; + public static final int UPDATE_AND_CREATE_TOPIC = 17; + public static final int GET_ALL_TOPIC_CONFIG = 21; + public static final int GET_TOPIC_CONFIG_LIST = 22; + + public static final int GET_TOPIC_NAME_LIST = 23; + + public static final int UPDATE_BROKER_CONFIG = 25; + + public static final int GET_BROKER_CONFIG = 26; + + public static final int TRIGGER_DELETE_FILES = 27; + + public static final int GET_BROKER_RUNTIME_INFO = 28; + public static final int SEARCH_OFFSET_BY_TIMESTAMP = 29; + public static final int GET_MAX_OFFSET = 30; + public static final int GET_MIN_OFFSET = 31; + + public static final int GET_EARLIEST_MSG_STORETIME = 32; + + public static final int VIEW_MESSAGE_BY_ID = 33; + + public static final int HEART_BEAT = 34; + + public static final int UNREGISTER_CLIENT = 35; + + public static final int CONSUMER_SEND_MSG_BACK = 36; + + public static final int END_TRANSACTION = 37; + public static final int GET_CONSUMER_LIST_BY_GROUP = 38; + + public static final int CHECK_TRANSACTION_STATE = 39; + + public static final int NOTIFY_CONSUMER_IDS_CHANGED = 40; + + public static final int LOCK_BATCH_MQ = 41; + + public static final int UNLOCK_BATCH_MQ = 42; + public static final int GET_ALL_CONSUMER_OFFSET = 43; + + public static final int GET_ALL_DELAY_OFFSET = 45; + + public static final int PUT_KV_CONFIG = 100; + + public static final int GET_KV_CONFIG = 101; + + public static final int DELETE_KV_CONFIG = 102; + + public static final int REGISTER_BROKER = 103; + + public static final int UNREGISTER_BROKER = 104; + public static final int GET_ROUTEINTO_BY_TOPIC = 105; + + public static final int GET_BROKER_CLUSTER_INFO = 106; + public static final int UPDATE_AND_CREATE_SUBSCRIPTIONGROUP = 200; + public static final int GET_ALL_SUBSCRIPTIONGROUP_CONFIG = 201; + public static final int GET_TOPIC_STATS_INFO = 202; + public static final int GET_CONSUMER_CONNECTION_LIST = 203; + public static final int GET_PRODUCER_CONNECTION_LIST = 204; + public static final int WIPE_WRITE_PERM_OF_BROKER = 205; + + + public static final int GET_ALL_TOPIC_LIST_FROM_NAMESERVER = 206; + + public static final int DELETE_SUBSCRIPTIONGROUP = 207; + public static final int GET_CONSUME_STATS = 208; + + public static final int SUSPEND_CONSUMER = 209; + + public static final int RESUME_CONSUMER = 210; + public static final int RESET_CONSUMER_OFFSET_IN_CONSUMER = 211; + public static final int RESET_CONSUMER_OFFSET_IN_BROKER = 212; + + public static final int ADJUST_CONSUMER_THREAD_POOL = 213; + + public static final int WHO_CONSUME_THE_MESSAGE = 214; + + + public static final int DELETE_TOPIC_IN_BROKER = 215; + + public static final int DELETE_TOPIC_IN_NAMESRV = 216; + public static final int GET_KVLIST_BY_NAMESPACE = 219; + + + public static final int RESET_CONSUMER_CLIENT_OFFSET = 220; + + public static final int GET_CONSUMER_STATUS_FROM_CLIENT = 221; + + public static final int INVOKE_BROKER_TO_RESET_OFFSET = 222; + + public static final int INVOKE_BROKER_TO_GET_CONSUMER_STATUS = 223; + + + public static final int QUERY_TOPIC_CONSUME_BY_WHO = 300; + + public static final int GET_TOPICS_BY_CLUSTER = 224; + + public static final int REGISTER_FILTER_SERVER = 301; + public static final int REGISTER_MESSAGE_FILTER_CLASS = 302; + + public static final int QUERY_CONSUME_TIME_SPAN = 303; + + public static final int GET_SYSTEM_TOPIC_LIST_FROM_NS = 304; + public static final int GET_SYSTEM_TOPIC_LIST_FROM_BROKER = 305; + + public static final int CLEAN_EXPIRED_CONSUMEQUEUE = 306; + + public static final int GET_CONSUMER_RUNNING_INFO = 307; + + public static final int QUERY_CORRECTION_OFFSET = 308; + public static final int CONSUME_MESSAGE_DIRECTLY = 309; + + public static final int SEND_MESSAGE_V2 = 310; + + public static final int GET_UNIT_TOPIC_LIST = 311; + + public static final int GET_HAS_UNIT_SUB_TOPIC_LIST = 312; + + public static final int GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST = 313; + + public static final int CLONE_GROUP_OFFSET = 314; + + public static final int VIEW_BROKER_STATS_DATA = 315; + + public static final int CLEAN_UNUSED_TOPIC = 316; + + public static final int GET_BROKER_CONSUME_STATS = 317; + + /** + * update the config of name server + */ + public static final int UPDATE_NAMESRV_CONFIG = 318; + + /** + * get config from name server + */ + public static final int GET_NAMESRV_CONFIG = 319; +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java new file mode 100644 index 0000000..a5b4e2e --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol; + +import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode; + + +public class ResponseCode extends RemotingSysResponseCode { + + public static final int FLUSH_DISK_TIMEOUT = 10; + + public static final int SLAVE_NOT_AVAILABLE = 11; + + public static final int FLUSH_SLAVE_TIMEOUT = 12; + + public static final int MESSAGE_ILLEGAL = 13; + + public static final int SERVICE_NOT_AVAILABLE = 14; + + public static final int VERSION_NOT_SUPPORTED = 15; + + public static final int NO_PERMISSION = 16; + + public static final int TOPIC_NOT_EXIST = 17; + public static final int TOPIC_EXIST_ALREADY = 18; + public static final int PULL_NOT_FOUND = 19; + + public static final int PULL_RETRY_IMMEDIATELY = 20; + + public static final int PULL_OFFSET_MOVED = 21; + + public static final int QUERY_NOT_FOUND = 22; + + public static final int SUBSCRIPTION_PARSE_FAILED = 23; + + public static final int SUBSCRIPTION_NOT_EXIST = 24; + + public static final int SUBSCRIPTION_NOT_LATEST = 25; + + public static final int SUBSCRIPTION_GROUP_NOT_EXIST = 26; + + public static final int TRANSACTION_SHOULD_COMMIT = 200; + + public static final int TRANSACTION_SHOULD_ROLLBACK = 201; + + public static final int TRANSACTION_STATE_UNKNOW = 202; + + public static final int TRANSACTION_STATE_GROUP_WRONG = 203; + public static final int NO_BUYER_ID = 204; + + + public static final int NOT_IN_CURRENT_UNIT = 205; + + + public static final int CONSUMER_NOT_ONLINE = 206; + + + public static final int CONSUME_MSG_TIMEOUT = 207; + + + public static final int NO_MESSAGE = 208; +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/body/BrokerStatsData.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/BrokerStatsData.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/BrokerStatsData.java new file mode 100644 index 0000000..f1ac124 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/BrokerStatsData.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.body; + +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; + + +public class BrokerStatsData extends RemotingSerializable { + + private BrokerStatsItem statsMinute; + + private BrokerStatsItem statsHour; + + private BrokerStatsItem statsDay; + + + public BrokerStatsItem getStatsMinute() { + return statsMinute; + } + + + public void setStatsMinute(BrokerStatsItem statsMinute) { + this.statsMinute = statsMinute; + } + + + public BrokerStatsItem getStatsHour() { + return statsHour; + } + + + public void setStatsHour(BrokerStatsItem statsHour) { + this.statsHour = statsHour; + } + + + public BrokerStatsItem getStatsDay() { + return statsDay; + } + + + public void setStatsDay(BrokerStatsItem statsDay) { + this.statsDay = statsDay; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/body/BrokerStatsItem.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/BrokerStatsItem.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/BrokerStatsItem.java new file mode 100644 index 0000000..904770f --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/BrokerStatsItem.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.body; + +public class BrokerStatsItem { + private long sum; + private double tps; + private double avgpt; + + + public long getSum() { + return sum; + } + + + public void setSum(long sum) { + this.sum = sum; + } + + + public double getTps() { + return tps; + } + + + public void setTps(double tps) { + this.tps = tps; + } + + + public double getAvgpt() { + return avgpt; + } + + + public void setAvgpt(double avgpt) { + this.avgpt = avgpt; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/body/CMResult.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/CMResult.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/CMResult.java new file mode 100644 index 0000000..85eda7b --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/CMResult.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.body; + +public enum CMResult { + CR_SUCCESS, + CR_LATER, + CR_ROLLBACK, + CR_COMMIT, + CR_THROW_EXCEPTION, + CR_RETURN_NULL, +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/body/ClusterInfo.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ClusterInfo.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ClusterInfo.java new file mode 100644 index 0000000..461d28c --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ClusterInfo.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.body; + +import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Set; + + +/** + * @author shijia.wxr + */ +public class ClusterInfo extends RemotingSerializable { + private HashMap<String/* brokerName */, BrokerData> brokerAddrTable; + private HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable; + + + public HashMap<String, BrokerData> getBrokerAddrTable() { + return brokerAddrTable; + } + + + public void setBrokerAddrTable(HashMap<String, BrokerData> brokerAddrTable) { + this.brokerAddrTable = brokerAddrTable; + } + + + public HashMap<String, Set<String>> getClusterAddrTable() { + return clusterAddrTable; + } + + + public void setClusterAddrTable(HashMap<String, Set<String>> clusterAddrTable) { + this.clusterAddrTable = clusterAddrTable; + } + + + public String[] retrieveAllAddrByCluster(String cluster) { + List<String> addrs = new ArrayList<String>(); + if (clusterAddrTable.containsKey(cluster)) { + Set<String> brokerNames = clusterAddrTable.get(cluster); + for (String brokerName : brokerNames) { + BrokerData brokerData = brokerAddrTable.get(brokerName); + if (null != brokerData) { + addrs.addAll(brokerData.getBrokerAddrs().values()); + } + } + } + + return addrs.toArray(new String[]{}); + } + + + public String[] retrieveAllClusterNames() { + return clusterAddrTable.keySet().toArray(new String[]{}); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/body/Connection.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/Connection.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/Connection.java new file mode 100644 index 0000000..ed8d9b4 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/Connection.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.body; + +import org.apache.rocketmq.remoting.protocol.LanguageCode; + + +/** + * @author shijia.wxr + */ +public class Connection { + private String clientId; + private String clientAddr; + private LanguageCode language; + private int version; + + + public String getClientId() { + return clientId; + } + + + public void setClientId(String clientId) { + this.clientId = clientId; + } + + + public String getClientAddr() { + return clientAddr; + } + + + public void setClientAddr(String clientAddr) { + this.clientAddr = clientAddr; + } + + + public LanguageCode getLanguage() { + return language; + } + + + public void setLanguage(LanguageCode language) { + this.language = language; + } + + + public int getVersion() { + return version; + } + + + public void setVersion(int version) { + this.version = version; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeByWho.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeByWho.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeByWho.java new file mode 100644 index 0000000..e6d2cad --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeByWho.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.protocol.body; + +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; + +import java.util.HashSet; + + +/** + * @author shijia.wxr + * + */ +public class ConsumeByWho extends RemotingSerializable { + private HashSet<String> consumedGroup = new HashSet<String>(); + private HashSet<String> notConsumedGroup = new HashSet<String>(); + private String topic; + private int queueId; + private long offset; + + + public HashSet<String> getConsumedGroup() { + return consumedGroup; + } + + + public void setConsumedGroup(HashSet<String> consumedGroup) { + this.consumedGroup = consumedGroup; + } + + + public HashSet<String> getNotConsumedGroup() { + return notConsumedGroup; + } + + + public void setNotConsumedGroup(HashSet<String> notConsumedGroup) { + this.notConsumedGroup = notConsumedGroup; + } + + + public String getTopic() { + return topic; + } + + + public void setTopic(String topic) { + this.topic = topic; + } + + + public int getQueueId() { + return queueId; + } + + + public void setQueueId(int queueId) { + this.queueId = queueId; + } + + + public long getOffset() { + return offset; + } + + + public void setOffset(long offset) { + this.offset = offset; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeMessageDirectlyResult.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeMessageDirectlyResult.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeMessageDirectlyResult.java new file mode 100644 index 0000000..9c63010 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeMessageDirectlyResult.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.body; + +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; + + +public class ConsumeMessageDirectlyResult extends RemotingSerializable { + private boolean order = false; + private boolean autoCommit = true; + private CMResult consumeResult; + private String remark; + private long spentTimeMills; + + + public boolean isOrder() { + return order; + } + + + public void setOrder(boolean order) { + this.order = order; + } + + + public boolean isAutoCommit() { + return autoCommit; + } + + + public void setAutoCommit(boolean autoCommit) { + this.autoCommit = autoCommit; + } + + + public String getRemark() { + return remark; + } + + + public void setRemark(String remark) { + this.remark = remark; + } + + + public CMResult getConsumeResult() { + return consumeResult; + } + + + public void setConsumeResult(CMResult consumeResult) { + this.consumeResult = consumeResult; + } + + + public long getSpentTimeMills() { + return spentTimeMills; + } + + + public void setSpentTimeMills(long spentTimeMills) { + this.spentTimeMills = spentTimeMills; + } + + + @Override + public String toString() { + return "ConsumeMessageDirectlyResult [order=" + order + ", autoCommit=" + autoCommit + + ", consumeResult=" + consumeResult + ", remark=" + remark + ", spentTimeMills=" + + spentTimeMills + "]"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeStatsList.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeStatsList.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeStatsList.java new file mode 100644 index 0000000..8d1396a --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeStatsList.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.protocol.body; + +import org.apache.rocketmq.common.admin.ConsumeStats; +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + + +/** + * @author shijia.wxr + */ +public class ConsumeStatsList extends RemotingSerializable { + private List<Map<String/*subscriptionGroupName*/, List<ConsumeStats>>> consumeStatsList = new ArrayList<Map<String/*subscriptionGroupName*/, List<ConsumeStats>>>(); + private String brokerAddr; + private long totalDiff; + + public List<Map<String, List<ConsumeStats>>> getConsumeStatsList() { + return consumeStatsList; + } + + public void setConsumeStatsList(List<Map<String, List<ConsumeStats>>> consumeStatsList) { + this.consumeStatsList = consumeStatsList; + } + + public String getBrokerAddr() { + return brokerAddr; + } + + public void setBrokerAddr(String brokerAddr) { + this.brokerAddr = brokerAddr; + } + + public long getTotalDiff() { + return totalDiff; + } + + public void setTotalDiff(long totalDiff) { + this.totalDiff = totalDiff; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeStatus.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeStatus.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeStatus.java new file mode 100644 index 0000000..35b6a02 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeStatus.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.body; + +public class ConsumeStatus { + private double pullRT; + private double pullTPS; + private double consumeRT; + private double consumeOKTPS; + private double consumeFailedTPS; + + private long consumeFailedMsgs; + + + public double getPullRT() { + return pullRT; + } + + + public void setPullRT(double pullRT) { + this.pullRT = pullRT; + } + + + public double getPullTPS() { + return pullTPS; + } + + + public void setPullTPS(double pullTPS) { + this.pullTPS = pullTPS; + } + + + public double getConsumeRT() { + return consumeRT; + } + + + public void setConsumeRT(double consumeRT) { + this.consumeRT = consumeRT; + } + + + public double getConsumeOKTPS() { + return consumeOKTPS; + } + + + public void setConsumeOKTPS(double consumeOKTPS) { + this.consumeOKTPS = consumeOKTPS; + } + + + public double getConsumeFailedTPS() { + return consumeFailedTPS; + } + + + public void setConsumeFailedTPS(double consumeFailedTPS) { + this.consumeFailedTPS = consumeFailedTPS; + } + + + public long getConsumeFailedMsgs() { + return consumeFailedMsgs; + } + + + public void setConsumeFailedMsgs(long consumeFailedMsgs) { + this.consumeFailedMsgs = consumeFailedMsgs; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerConnection.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerConnection.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerConnection.java new file mode 100644 index 0000000..fc3ce46 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerConnection.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.body; + +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; + +import java.util.HashSet; +import java.util.concurrent.ConcurrentHashMap; + + +/** + * @author shijia.wxr + */ +public class ConsumerConnection extends RemotingSerializable { + private HashSet<Connection> connectionSet = new HashSet<Connection>(); + private ConcurrentHashMap<String/* Topic */, SubscriptionData> subscriptionTable = + new ConcurrentHashMap<String, SubscriptionData>(); + private ConsumeType consumeType; + private MessageModel messageModel; + private ConsumeFromWhere consumeFromWhere; + + + public int computeMinVersion() { + int minVersion = Integer.MAX_VALUE; + for (Connection c : this.connectionSet) { + if (c.getVersion() < minVersion) { + minVersion = c.getVersion(); + } + } + + return minVersion; + } + + + public HashSet<Connection> getConnectionSet() { + return connectionSet; + } + + + public void setConnectionSet(HashSet<Connection> connectionSet) { + this.connectionSet = connectionSet; + } + + + public ConcurrentHashMap<String, SubscriptionData> getSubscriptionTable() { + return subscriptionTable; + } + + + public void setSubscriptionTable(ConcurrentHashMap<String, SubscriptionData> subscriptionTable) { + this.subscriptionTable = subscriptionTable; + } + + + public ConsumeType getConsumeType() { + return consumeType; + } + + + public void setConsumeType(ConsumeType consumeType) { + this.consumeType = consumeType; + } + + + public MessageModel getMessageModel() { + return messageModel; + } + + + public void setMessageModel(MessageModel messageModel) { + this.messageModel = messageModel; + } + + + public ConsumeFromWhere getConsumeFromWhere() { + return consumeFromWhere; + } + + + public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) { + this.consumeFromWhere = consumeFromWhere; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerOffsetSerializeWrapper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerOffsetSerializeWrapper.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerOffsetSerializeWrapper.java new file mode 100644 index 0000000..5b4c6fb --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerOffsetSerializeWrapper.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.body; + +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; + +import java.util.concurrent.ConcurrentHashMap; + + +/** + * @author manhong.yqd + */ +public class ConsumerOffsetSerializeWrapper extends RemotingSerializable { + private ConcurrentHashMap<String/* topic@group */, ConcurrentHashMap<Integer, Long>> offsetTable = + new ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>>(512); + + + public ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> getOffsetTable() { + return offsetTable; + } + + + public void setOffsetTable(ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> offsetTable) { + this.offsetTable = offsetTable; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfo.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfo.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfo.java new file mode 100644 index 0000000..9b0b383 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfo.java @@ -0,0 +1,315 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.body; + +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; + +import java.util.*; +import java.util.Map.Entry; + +public class ConsumerRunningInfo extends RemotingSerializable { + public static final String PROP_NAMESERVER_ADDR = "PROP_NAMESERVER_ADDR"; + public static final String PROP_THREADPOOL_CORE_SIZE = "PROP_THREADPOOL_CORE_SIZE"; + public static final String PROP_CONSUME_ORDERLY = "PROP_CONSUMEORDERLY"; + public static final String PROP_CONSUME_TYPE = "PROP_CONSUME_TYPE"; + public static final String PROP_CLIENT_VERSION = "PROP_CLIENT_VERSION"; + public static final String PROP_CONSUMER_START_TIMESTAMP = "PROP_CONSUMER_START_TIMESTAMP"; + + + private Properties properties = new Properties(); + + private TreeSet<SubscriptionData> subscriptionSet = new TreeSet<SubscriptionData>(); + + private TreeMap<MessageQueue, ProcessQueueInfo> mqTable = new TreeMap<MessageQueue, ProcessQueueInfo>(); + + private TreeMap<String/* Topic */, ConsumeStatus> statusTable = new TreeMap<String, ConsumeStatus>(); + + private String jstack; + + public static boolean analyzeSubscription(final TreeMap<String/* clientId */, ConsumerRunningInfo> criTable) { + ConsumerRunningInfo prev = criTable.firstEntry().getValue(); + + boolean push = false; + { + String property = prev.getProperties().getProperty(ConsumerRunningInfo.PROP_CONSUME_TYPE); + + if (property == null) { + property = ((ConsumeType) prev.getProperties().get(ConsumerRunningInfo.PROP_CONSUME_TYPE)).name(); + } + push = ConsumeType.valueOf(property) == ConsumeType.CONSUME_PASSIVELY; + } + + boolean startForAWhile = false; + { + + String property = prev.getProperties().getProperty(ConsumerRunningInfo.PROP_CONSUMER_START_TIMESTAMP); + if (property == null) { + property = String.valueOf(prev.getProperties().get(ConsumerRunningInfo.PROP_CONSUMER_START_TIMESTAMP)); + } + startForAWhile = (System.currentTimeMillis() - Long.parseLong(property)) > (1000 * 60 * 2); + } + + if (push && startForAWhile) { + + { + Iterator<Entry<String, ConsumerRunningInfo>> it = criTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<String, ConsumerRunningInfo> next = it.next(); + ConsumerRunningInfo current = next.getValue(); + boolean equals = current.getSubscriptionSet().equals(prev.getSubscriptionSet()); + + if (!equals) { + // Different subscription in the same group of consumer + return false; + } + + prev = next.getValue(); + } + + if (prev != null) { + + if (prev.getSubscriptionSet().isEmpty()) { + // Subscription empty! + return false; + } + } + } + } + + return true; + } + + public Properties getProperties() { + return properties; + } + + public void setProperties(Properties properties) { + this.properties = properties; + } + + public TreeSet<SubscriptionData> getSubscriptionSet() { + return subscriptionSet; + } + + public void setSubscriptionSet(TreeSet<SubscriptionData> subscriptionSet) { + this.subscriptionSet = subscriptionSet; + } + + public static boolean analyzeRebalance(final TreeMap<String/* clientId */, ConsumerRunningInfo> criTable) { + return true; + } + + public static String analyzeProcessQueue(final String clientId, ConsumerRunningInfo info) { + StringBuilder sb = new StringBuilder(); + boolean push = false; + { + String property = info.getProperties().getProperty(ConsumerRunningInfo.PROP_CONSUME_TYPE); + + if (property == null) { + property = ((ConsumeType) info.getProperties().get(ConsumerRunningInfo.PROP_CONSUME_TYPE)).name(); + } + push = ConsumeType.valueOf(property) == ConsumeType.CONSUME_PASSIVELY; + } + + boolean orderMsg = false; + { + String property = info.getProperties().getProperty(ConsumerRunningInfo.PROP_CONSUME_ORDERLY); + orderMsg = Boolean.parseBoolean(property); + } + + if (push) { + Iterator<Entry<MessageQueue, ProcessQueueInfo>> it = info.getMqTable().entrySet().iterator(); + while (it.hasNext()) { + Entry<MessageQueue, ProcessQueueInfo> next = it.next(); + MessageQueue mq = next.getKey(); + ProcessQueueInfo pq = next.getValue(); + + + if (orderMsg) { + + if (!pq.isLocked()) { + sb.append(String.format("%s %s can't lock for a while, %dms%n", // + clientId, // + mq, // + System.currentTimeMillis() - pq.getLastLockTimestamp())); + } else { + if (pq.isDroped() && (pq.getTryUnlockTimes() > 0)) { + sb.append(String.format("%s %s unlock %d times, still failed%n", // + clientId, // + mq, // + pq.getTryUnlockTimes())); + } + } + + + } else { + long diff = System.currentTimeMillis() - pq.getLastConsumeTimestamp(); + + if (diff > (1000 * 60) && pq.getCachedMsgCount() > 0) { + sb.append(String.format("%s %s can't consume for a while, maybe blocked, %dms%n", // + clientId, // + mq, // + diff)); + } + } + } + } + + return sb.toString(); + } + + public TreeMap<MessageQueue, ProcessQueueInfo> getMqTable() { + return mqTable; + } + + public void setMqTable(TreeMap<MessageQueue, ProcessQueueInfo> mqTable) { + this.mqTable = mqTable; + } + + public TreeMap<String, ConsumeStatus> getStatusTable() { + return statusTable; + } + + public void setStatusTable(TreeMap<String, ConsumeStatus> statusTable) { + this.statusTable = statusTable; + } + + public String formatString() { + StringBuilder sb = new StringBuilder(); + + { + sb.append("#Consumer Properties#\n"); + Iterator<Entry<Object, Object>> it = this.properties.entrySet().iterator(); + while (it.hasNext()) { + Entry<Object, Object> next = it.next(); + String item = String.format("%-40s: %s%n", next.getKey().toString(), next.getValue().toString()); + sb.append(item); + } + } + + { + sb.append("\n\n#Consumer Subscription#\n"); + + Iterator<SubscriptionData> it = this.subscriptionSet.iterator(); + int i = 0; + while (it.hasNext()) { + SubscriptionData next = it.next(); + String item = String.format("%03d Topic: %-40s ClassFilter: %-8s SubExpression: %s%n", // + ++i, // + next.getTopic(), // + next.isClassFilterMode(), // + next.getSubString()); + + sb.append(item); + } + } + + { + sb.append("\n\n#Consumer Offset#\n"); + sb.append(String.format("%-32s %-32s %-4s %-20s%n", // + "#Topic", // + "#Broker Name", // + "#QID", // + "#Consumer Offset"// + )); + + Iterator<Entry<MessageQueue, ProcessQueueInfo>> it = this.mqTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<MessageQueue, ProcessQueueInfo> next = it.next(); + String item = String.format("%-32s %-32s %-4d %-20d%n", // + next.getKey().getTopic(), // + next.getKey().getBrokerName(), // + next.getKey().getQueueId(), // + next.getValue().getCommitOffset()); + + sb.append(item); + } + } + + { + sb.append("\n\n#Consumer MQ Detail#\n"); + sb.append(String.format("%-32s %-32s %-4s %-20s%n", // + "#Topic", // + "#Broker Name", // + "#QID", // + "#ProcessQueueInfo"// + )); + + Iterator<Entry<MessageQueue, ProcessQueueInfo>> it = this.mqTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<MessageQueue, ProcessQueueInfo> next = it.next(); + String item = String.format("%-32s %-32s %-4d %s%n", // + next.getKey().getTopic(), // + next.getKey().getBrokerName(), // + next.getKey().getQueueId(), // + next.getValue().toString()); + + sb.append(item); + } + } + + { + sb.append("\n\n#Consumer RT&TPS#\n"); + sb.append(String.format("%-32s %14s %14s %14s %14s %18s %25s%n", // + "#Topic", // + "#Pull RT", // + "#Pull TPS", // + "#Consume RT", // + "#ConsumeOK TPS", // + "#ConsumeFailed TPS", // + "#ConsumeFailedMsgsInHour"// + )); + + Iterator<Entry<String, ConsumeStatus>> it = this.statusTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<String, ConsumeStatus> next = it.next(); + String item = String.format("%-32s %14.2f %14.2f %14.2f %14.2f %18.2f %25d%n", // + next.getKey(), // + next.getValue().getPullRT(), // + next.getValue().getPullTPS(), // + next.getValue().getConsumeRT(), // + next.getValue().getConsumeOKTPS(), // + next.getValue().getConsumeFailedTPS(), // + next.getValue().getConsumeFailedMsgs()// + ); + + sb.append(item); + } + } + + if (this.jstack != null) { + sb.append("\n\n#Consumer jstack#\n"); + sb.append(this.jstack); + } + + return sb.toString(); + } + + public String getJstack() { + return jstack; + } + + + public void setJstack(String jstack) { + this.jstack = jstack; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/body/GetConsumerStatusBody.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/GetConsumerStatusBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/GetConsumerStatusBody.java new file mode 100644 index 0000000..71d8667 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/GetConsumerStatusBody.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.body; + +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; + +import java.util.HashMap; +import java.util.Map; + + +/** + * @author manhong.yqd + */ +@Deprecated +public class GetConsumerStatusBody extends RemotingSerializable { + private Map<MessageQueue, Long> messageQueueTable = new HashMap<MessageQueue, Long>(); + private Map<String, Map<MessageQueue, Long>> consumerTable = + new HashMap<String, Map<MessageQueue, Long>>(); + + + public Map<MessageQueue, Long> getMessageQueueTable() { + return messageQueueTable; + } + + + public void setMessageQueueTable(Map<MessageQueue, Long> messageQueueTable) { + this.messageQueueTable = messageQueueTable; + } + + + public Map<String, Map<MessageQueue, Long>> getConsumerTable() { + return consumerTable; + } + + + public void setConsumerTable(Map<String, Map<MessageQueue, Long>> consumerTable) { + this.consumerTable = consumerTable; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/body/GroupList.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/GroupList.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/GroupList.java new file mode 100644 index 0000000..db7e071 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/GroupList.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.protocol.body; + +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; + +import java.util.HashSet; + + +/** + * @author shijia.wxr + * + */ +public class GroupList extends RemotingSerializable { + private HashSet<String> groupList = new HashSet<String>(); + + + public HashSet<String> getGroupList() { + return groupList; + } + + + public void setGroupList(HashSet<String> groupList) { + this.groupList = groupList; + } +}
