http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageConst.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageConst.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageConst.java new file mode 100644 index 0000000..fc06d6e --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageConst.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.common.message; + +import java.util.HashSet; + + +public class MessageConst { + public static final String PROPERTY_KEYS = "KEYS"; + public static final String PROPERTY_TAGS = "TAGS"; + public static final String PROPERTY_WAIT_STORE_MSG_OK = "WAIT"; + public static final String PROPERTY_DELAY_TIME_LEVEL = "DELAY"; + public static final String PROPERTY_RETRY_TOPIC = "RETRY_TOPIC"; + public static final String PROPERTY_REAL_TOPIC = "REAL_TOPIC"; + public static final String PROPERTY_REAL_QUEUE_ID = "REAL_QID"; + public static final String PROPERTY_TRANSACTION_PREPARED = "TRAN_MSG"; + public static final String PROPERTY_PRODUCER_GROUP = "PGROUP"; + public static final String PROPERTY_MIN_OFFSET = "MIN_OFFSET"; + public static final String PROPERTY_MAX_OFFSET = "MAX_OFFSET"; + public static final String PROPERTY_BUYER_ID = "BUYER_ID"; + public static final String PROPERTY_ORIGIN_MESSAGE_ID = "ORIGIN_MESSAGE_ID"; + public static final String PROPERTY_TRANSFER_FLAG = "TRANSFER_FLAG"; + public static final String PROPERTY_CORRECTION_FLAG = "CORRECTION_FLAG"; + public static final String PROPERTY_MQ2_FLAG = "MQ2_FLAG"; + public static final String PROPERTY_RECONSUME_TIME = "RECONSUME_TIME"; + public static final String PROPERTY_MSG_REGION = "MSG_REGION"; + public static final String PROPERTY_TRACE_SWITCH = "TRACE_ON"; + public static final String PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX = "UNIQ_KEY"; + public static final String PROPERTY_MAX_RECONSUME_TIMES = "MAX_RECONSUME_TIMES"; + public static final String PROPERTY_CONSUME_START_TIMESTAMP = "CONSUME_START_TIME"; + + public static final String KEY_SEPARATOR = " "; + + public static final HashSet<String> STRING_HASH_SET = new HashSet<String>(); + + + static { + STRING_HASH_SET.add(PROPERTY_TRACE_SWITCH); + STRING_HASH_SET.add(PROPERTY_MSG_REGION); + STRING_HASH_SET.add(PROPERTY_KEYS); + STRING_HASH_SET.add(PROPERTY_TAGS); + STRING_HASH_SET.add(PROPERTY_WAIT_STORE_MSG_OK); + STRING_HASH_SET.add(PROPERTY_DELAY_TIME_LEVEL); + STRING_HASH_SET.add(PROPERTY_RETRY_TOPIC); + STRING_HASH_SET.add(PROPERTY_REAL_TOPIC); + STRING_HASH_SET.add(PROPERTY_REAL_QUEUE_ID); + STRING_HASH_SET.add(PROPERTY_TRANSACTION_PREPARED); + STRING_HASH_SET.add(PROPERTY_PRODUCER_GROUP); + STRING_HASH_SET.add(PROPERTY_MIN_OFFSET); + STRING_HASH_SET.add(PROPERTY_MAX_OFFSET); + STRING_HASH_SET.add(PROPERTY_BUYER_ID); + STRING_HASH_SET.add(PROPERTY_ORIGIN_MESSAGE_ID); + STRING_HASH_SET.add(PROPERTY_TRANSFER_FLAG); + STRING_HASH_SET.add(PROPERTY_CORRECTION_FLAG); + STRING_HASH_SET.add(PROPERTY_MQ2_FLAG); + STRING_HASH_SET.add(PROPERTY_RECONSUME_TIME); + STRING_HASH_SET.add(PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); + STRING_HASH_SET.add(PROPERTY_MAX_RECONSUME_TIMES); + STRING_HASH_SET.add(PROPERTY_CONSUME_START_TIMESTAMP); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageDecoder.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageDecoder.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageDecoder.java new file mode 100644 index 0000000..e21c1ca --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageDecoder.java @@ -0,0 +1,395 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.common.message; + +import com.alibaba.rocketmq.common.UtilAll; +import com.alibaba.rocketmq.common.sysflag.MessageSysFlag; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.UnknownHostException; +import java.nio.BufferUnderflowException; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + + +/** + * @author shijia.wxr + */ +public class MessageDecoder { + public final static int MSG_ID_LENGTH = 8 + 8; + + public final static Charset CHARSET_UTF8 = Charset.forName("UTF-8"); + public final static int MESSAGE_MAGIC_CODE_POSTION = 4; + public final static int MESSAGE_FLAG_POSTION = 16; + public final static int MESSAGE_PHYSIC_OFFSET_POSTION = 28; + public final static int MESSAGE_STORE_TIMESTAMP_POSTION = 56; + public final static int MESSAGE_MAGIC_CODE = 0xAABBCCDD ^ 1880681586 + 8; + + + public static String createMessageId(final ByteBuffer input, final ByteBuffer addr, final long offset) { + input.flip(); + input.limit(MessageDecoder.MSG_ID_LENGTH); + + input.put(addr); + input.putLong(offset); + + return UtilAll.bytes2string(input.array()); + } + + + public static String createMessageId(SocketAddress socketAddress, long transactionIdhashCode) { + ByteBuffer byteBuffer = ByteBuffer.allocate(MessageDecoder.MSG_ID_LENGTH); + InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress; + byteBuffer.put(inetSocketAddress.getAddress().getAddress()); + byteBuffer.putInt(inetSocketAddress.getPort()); + byteBuffer.putLong(transactionIdhashCode); + byteBuffer.flip(); + return UtilAll.bytes2string(byteBuffer.array()); + } + + + public static MessageId decodeMessageId(final String msgId) throws UnknownHostException { + SocketAddress address; + long offset; + + + byte[] ip = UtilAll.string2bytes(msgId.substring(0, 8)); + byte[] port = UtilAll.string2bytes(msgId.substring(8, 16)); + ByteBuffer bb = ByteBuffer.wrap(port); + int portInt = bb.getInt(0); + address = new InetSocketAddress(InetAddress.getByAddress(ip), portInt); + + // offset + byte[] data = UtilAll.string2bytes(msgId.substring(16, 32)); + bb = ByteBuffer.wrap(data); + offset = bb.getLong(0); + + return new MessageId(address, offset); + } + + + public static MessageExt decode(java.nio.ByteBuffer byteBuffer) { + return decode(byteBuffer, true, true, false); + } + + public static MessageExt clientDecode(java.nio.ByteBuffer byteBuffer, final boolean readBody) { + return decode(byteBuffer, readBody, true, true); + } + + public static MessageExt decode(java.nio.ByteBuffer byteBuffer, final boolean readBody) { + return decode(byteBuffer, readBody, true, false); + } + + + public static byte[] encode(MessageExt messageExt, boolean needCompress) throws Exception { + byte[] body = messageExt.getBody(); + byte[] topics = messageExt.getTopic().getBytes(CHARSET_UTF8); + byte topicLen = (byte) topics.length; + String properties = messageProperties2String(messageExt.getProperties()); + byte[] propertiesBytes = properties.getBytes(CHARSET_UTF8); + short propertiesLength = (short) propertiesBytes.length; + int sysFlag = messageExt.getSysFlag(); + byte[] newBody = messageExt.getBody(); + if (needCompress && (sysFlag & MessageSysFlag.COMPRESSED_FLAG) == MessageSysFlag.COMPRESSED_FLAG) { + newBody = UtilAll.compress(body, 5); + } + int bodyLength = newBody.length; + int storeSize = messageExt.getStoreSize(); + ByteBuffer byteBuffer; + if (storeSize > 0) { + byteBuffer = ByteBuffer.allocate(storeSize); + } else { + storeSize = 4 // 1 TOTALSIZE + + 4 // 2 MAGICCODE + + 4 // 3 BODYCRC + + 4 // 4 QUEUEID + + 4 // 5 FLAG + + 8 // 6 QUEUEOFFSET + + 8 // 7 PHYSICALOFFSET + + 4 // 8 SYSFLAG + + 8 // 9 BORNTIMESTAMP + + 8 // 10 BORNHOST + + 8 // 11 STORETIMESTAMP + + 8 // 12 STOREHOSTADDRESS + + 4 // 13 RECONSUMETIMES + + 8 // 14 Prepared Transaction Offset + + 4 + bodyLength // 14 BODY + + 1 + topicLen // 15 TOPIC + + 2 + propertiesLength // 16 propertiesLength + + 0; + byteBuffer = ByteBuffer.allocate(storeSize); + } + // 1 TOTALSIZE + byteBuffer.putInt(storeSize); + + // 2 MAGICCODE + byteBuffer.putInt(MESSAGE_MAGIC_CODE); + + // 3 BODYCRC + int bodyCRC = messageExt.getBodyCRC(); + byteBuffer.putInt(bodyCRC); + + // 4 QUEUEID + int queueId = messageExt.getQueueId(); + byteBuffer.putInt(queueId); + + // 5 FLAG + int flag = messageExt.getFlag(); + byteBuffer.putInt(flag); + + // 6 QUEUEOFFSET + long queueOffset = messageExt.getQueueOffset(); + byteBuffer.putLong(queueOffset); + + // 7 PHYSICALOFFSET + long physicOffset = messageExt.getCommitLogOffset(); + byteBuffer.putLong(physicOffset); + + // 8 SYSFLAG + byteBuffer.putInt(sysFlag); + + // 9 BORNTIMESTAMP + long bornTimeStamp = messageExt.getBornTimestamp(); + byteBuffer.putLong(bornTimeStamp); + + // 10 BORNHOST + InetSocketAddress bornHost = (InetSocketAddress) messageExt.getBornHost(); + byteBuffer.put(bornHost.getAddress().getAddress()); + byteBuffer.putInt(bornHost.getPort()); + + // 11 STORETIMESTAMP + long storeTimestamp = messageExt.getStoreTimestamp(); + byteBuffer.putLong(storeTimestamp); + + // 12 STOREHOST + InetSocketAddress serverHost = (InetSocketAddress) messageExt.getStoreHost(); + byteBuffer.put(serverHost.getAddress().getAddress()); + byteBuffer.putInt(serverHost.getPort()); + + // 13 RECONSUMETIMES + int reconsumeTimes = messageExt.getReconsumeTimes(); + byteBuffer.putInt(reconsumeTimes); + + // 14 Prepared Transaction Offset + long preparedTransactionOffset = messageExt.getPreparedTransactionOffset(); + byteBuffer.putLong(preparedTransactionOffset); + + // 15 BODY + byteBuffer.putInt(bodyLength); + byteBuffer.put(newBody); + + // 16 TOPIC + byteBuffer.put(topicLen); + byteBuffer.put(topics); + + // 17 properties + byteBuffer.putShort(propertiesLength); + byteBuffer.put(propertiesBytes); + + return byteBuffer.array(); + } + + public static MessageExt decode( + java.nio.ByteBuffer byteBuffer, final boolean readBody, final boolean deCompressBody) { + return decode(byteBuffer, readBody, deCompressBody, false); + } + + public static MessageExt decode( + java.nio.ByteBuffer byteBuffer, final boolean readBody, final boolean deCompressBody, final boolean isClient) { + try { + + MessageExt msgExt; + if (isClient) { + msgExt = new MessageClientExt(); + } else { + msgExt = new MessageExt(); + } + + // 1 TOTALSIZE + int storeSize = byteBuffer.getInt(); + msgExt.setStoreSize(storeSize); + + // 2 MAGICCODE + byteBuffer.getInt(); + + // 3 BODYCRC + int bodyCRC = byteBuffer.getInt(); + msgExt.setBodyCRC(bodyCRC); + + // 4 QUEUEID + int queueId = byteBuffer.getInt(); + msgExt.setQueueId(queueId); + + // 5 FLAG + int flag = byteBuffer.getInt(); + msgExt.setFlag(flag); + + // 6 QUEUEOFFSET + long queueOffset = byteBuffer.getLong(); + msgExt.setQueueOffset(queueOffset); + + // 7 PHYSICALOFFSET + long physicOffset = byteBuffer.getLong(); + msgExt.setCommitLogOffset(physicOffset); + + // 8 SYSFLAG + int sysFlag = byteBuffer.getInt(); + msgExt.setSysFlag(sysFlag); + + // 9 BORNTIMESTAMP + long bornTimeStamp = byteBuffer.getLong(); + msgExt.setBornTimestamp(bornTimeStamp); + + // 10 BORNHOST + byte[] bornHost = new byte[4]; + byteBuffer.get(bornHost, 0, 4); + int port = byteBuffer.getInt(); + msgExt.setBornHost(new InetSocketAddress(InetAddress.getByAddress(bornHost), port)); + + // 11 STORETIMESTAMP + long storeTimestamp = byteBuffer.getLong(); + msgExt.setStoreTimestamp(storeTimestamp); + + // 12 STOREHOST + byte[] storeHost = new byte[4]; + byteBuffer.get(storeHost, 0, 4); + port = byteBuffer.getInt(); + msgExt.setStoreHost(new InetSocketAddress(InetAddress.getByAddress(storeHost), port)); + + // 13 RECONSUMETIMES + int reconsumeTimes = byteBuffer.getInt(); + msgExt.setReconsumeTimes(reconsumeTimes); + + // 14 Prepared Transaction Offset + long preparedTransactionOffset = byteBuffer.getLong(); + msgExt.setPreparedTransactionOffset(preparedTransactionOffset); + + // 15 BODY + int bodyLen = byteBuffer.getInt(); + if (bodyLen > 0) { + if (readBody) { + byte[] body = new byte[bodyLen]; + byteBuffer.get(body); + + // uncompress body + if (deCompressBody && (sysFlag & MessageSysFlag.COMPRESSED_FLAG) == MessageSysFlag.COMPRESSED_FLAG) { + body = UtilAll.uncompress(body); + } + + msgExt.setBody(body); + } else { + byteBuffer.position(byteBuffer.position() + bodyLen); + } + } + + // 16 TOPIC + byte topicLen = byteBuffer.get(); + byte[] topic = new byte[(int) topicLen]; + byteBuffer.get(topic); + msgExt.setTopic(new String(topic, CHARSET_UTF8)); + + // 17 properties + short propertiesLength = byteBuffer.getShort(); + if (propertiesLength > 0) { + byte[] properties = new byte[propertiesLength]; + byteBuffer.get(properties); + String propertiesString = new String(properties, CHARSET_UTF8); + Map<String, String> map = string2messageProperties(propertiesString); + msgExt.setProperties(map); + } + + ByteBuffer byteBufferMsgId = ByteBuffer.allocate(MSG_ID_LENGTH); + String msgId = createMessageId(byteBufferMsgId, msgExt.getStoreHostBytes(), msgExt.getCommitLogOffset()); + msgExt.setMsgId(msgId); + + if (isClient) { + ((MessageClientExt) msgExt).setOffsetMsgId(msgId); + } + + return msgExt; + } catch (UnknownHostException e) { + byteBuffer.position(byteBuffer.limit()); + } catch (BufferUnderflowException e) { + byteBuffer.position(byteBuffer.limit()); + } catch (Exception e) { + byteBuffer.position(byteBuffer.limit()); + } + + return null; + } + + + public static List<MessageExt> decodes(java.nio.ByteBuffer byteBuffer) { + return decodes(byteBuffer, true); + } + + public static List<MessageExt> decodes(java.nio.ByteBuffer byteBuffer, final boolean readBody) { + List<MessageExt> msgExts = new ArrayList<MessageExt>(); + while (byteBuffer.hasRemaining()) { + MessageExt msgExt = clientDecode(byteBuffer, readBody); + if (null != msgExt) { + msgExts.add(msgExt); + } else { + break; + } + } + return msgExts; + } + + public static final char NAME_VALUE_SEPARATOR = 1; + public static final char PROPERTY_SEPARATOR = 2; + + + public static String messageProperties2String(Map<String, String> properties) { + StringBuilder sb = new StringBuilder(); + if (properties != null) { + for (final Map.Entry<String, String> entry : properties.entrySet()) { + final String name = entry.getKey(); + final String value = entry.getValue(); + + sb.append(name); + sb.append(NAME_VALUE_SEPARATOR); + sb.append(value); + sb.append(PROPERTY_SEPARATOR); + } + } + return sb.toString(); + } + + public static Map<String, String> string2messageProperties(final String properties) { + Map<String, String> map = new HashMap<String, String>(); + if (properties != null) { + String[] items = properties.split(String.valueOf(PROPERTY_SEPARATOR)); + if (items != null) { + for (String i : items) { + String[] nv = i.split(String.valueOf(NAME_VALUE_SEPARATOR)); + if (nv != null && 2 == nv.length) { + map.put(nv[0], nv[1]); + } + } + } + } + + return map; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageExt.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageExt.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageExt.java new file mode 100644 index 0000000..627935d --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageExt.java @@ -0,0 +1,238 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.common.message; + +import com.alibaba.rocketmq.common.TopicFilterType; +import com.alibaba.rocketmq.common.sysflag.MessageSysFlag; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; + + +/** + * @author shijia.wxr + */ +public class MessageExt extends Message { + private static final long serialVersionUID = 5720810158625748049L; + + private int queueId; + + private int storeSize; + + private long queueOffset; + private int sysFlag; + private long bornTimestamp; + private SocketAddress bornHost; + + private long storeTimestamp; + private SocketAddress storeHost; + private String msgId; + private long commitLogOffset; + private int bodyCRC; + private int reconsumeTimes; + + private long preparedTransactionOffset; + + + public MessageExt() { + } + + + public MessageExt(int queueId, long bornTimestamp, SocketAddress bornHost, long storeTimestamp, + SocketAddress storeHost, String msgId) { + this.queueId = queueId; + this.bornTimestamp = bornTimestamp; + this.bornHost = bornHost; + this.storeTimestamp = storeTimestamp; + this.storeHost = storeHost; + this.msgId = msgId; + } + + public static TopicFilterType parseTopicFilterType(final int sysFlag) { + if ((sysFlag & MessageSysFlag.MULTI_TAGS_FLAG) == MessageSysFlag.MULTI_TAGS_FLAG) { + return TopicFilterType.MULTI_TAG; + } + + return TopicFilterType.SINGLE_TAG; + } + + public ByteBuffer getBornHostBytes() { + return socketAddress2ByteBuffer(this.bornHost); + } + + public ByteBuffer getBornHostBytes(ByteBuffer byteBuffer) { + return socketAddress2ByteBuffer(this.bornHost, byteBuffer); + } + + private static ByteBuffer socketAddress2ByteBuffer(final SocketAddress socketAddress, final ByteBuffer byteBuffer) { + InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress; + byteBuffer.put(inetSocketAddress.getAddress().getAddress(), 0, 4); + byteBuffer.putInt(inetSocketAddress.getPort()); + byteBuffer.flip(); + return byteBuffer; + } + + public static ByteBuffer socketAddress2ByteBuffer(SocketAddress socketAddress) { + ByteBuffer byteBuffer = ByteBuffer.allocate(8); + return socketAddress2ByteBuffer(socketAddress, byteBuffer); + } + + public ByteBuffer getStoreHostBytes() { + return socketAddress2ByteBuffer(this.storeHost); + } + + public ByteBuffer getStoreHostBytes(ByteBuffer byteBuffer) { + return socketAddress2ByteBuffer(this.storeHost, byteBuffer); + } + + public int getQueueId() { + return queueId; + } + + public void setQueueId(int queueId) { + this.queueId = queueId; + } + + public long getBornTimestamp() { + return bornTimestamp; + } + + public void setBornTimestamp(long bornTimestamp) { + this.bornTimestamp = bornTimestamp; + } + + public SocketAddress getBornHost() { + return bornHost; + } + + public void setBornHost(SocketAddress bornHost) { + this.bornHost = bornHost; + } + + public String getBornHostString() { + if (this.bornHost != null) { + InetSocketAddress inetSocketAddress = (InetSocketAddress) this.bornHost; + return inetSocketAddress.getAddress().getHostAddress(); + } + + return null; + } + + public String getBornHostNameString() { + if (this.bornHost != null) { + InetSocketAddress inetSocketAddress = (InetSocketAddress) this.bornHost; + return inetSocketAddress.getAddress().getHostName(); + } + + return null; + } + + public long getStoreTimestamp() { + return storeTimestamp; + } + + public void setStoreTimestamp(long storeTimestamp) { + this.storeTimestamp = storeTimestamp; + } + + public SocketAddress getStoreHost() { + return storeHost; + } + + public void setStoreHost(SocketAddress storeHost) { + this.storeHost = storeHost; + } + + public String getMsgId() { + return msgId; + } + + public void setMsgId(String msgId) { + this.msgId = msgId; + } + + public int getSysFlag() { + return sysFlag; + } + + public void setSysFlag(int sysFlag) { + this.sysFlag = sysFlag; + } + + public int getBodyCRC() { + return bodyCRC; + } + + public void setBodyCRC(int bodyCRC) { + this.bodyCRC = bodyCRC; + } + + public long getQueueOffset() { + return queueOffset; + } + + public void setQueueOffset(long queueOffset) { + this.queueOffset = queueOffset; + } + + public long getCommitLogOffset() { + return commitLogOffset; + } + + public void setCommitLogOffset(long physicOffset) { + this.commitLogOffset = physicOffset; + } + + public int getStoreSize() { + return storeSize; + } + + public void setStoreSize(int storeSize) { + this.storeSize = storeSize; + } + + public int getReconsumeTimes() { + return reconsumeTimes; + } + + + public void setReconsumeTimes(int reconsumeTimes) { + this.reconsumeTimes = reconsumeTimes; + } + + + public long getPreparedTransactionOffset() { + return preparedTransactionOffset; + } + + + public void setPreparedTransactionOffset(long preparedTransactionOffset) { + this.preparedTransactionOffset = preparedTransactionOffset; + } + + + @Override + public String toString() { + return "MessageExt [queueId=" + queueId + ", storeSize=" + storeSize + ", queueOffset=" + queueOffset + + ", sysFlag=" + sysFlag + ", bornTimestamp=" + bornTimestamp + ", bornHost=" + bornHost + + ", storeTimestamp=" + storeTimestamp + ", storeHost=" + storeHost + ", msgId=" + msgId + + ", commitLogOffset=" + commitLogOffset + ", bodyCRC=" + bodyCRC + ", reconsumeTimes=" + + reconsumeTimes + ", preparedTransactionOffset=" + preparedTransactionOffset + + ", toString()=" + super.toString() + "]"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageId.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageId.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageId.java new file mode 100644 index 0000000..d08be86 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageId.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.common.message; + +import java.net.SocketAddress; + + +/** + * @author shijia.wxr + */ +public class MessageId { + private SocketAddress address; + private long offset; + + + public MessageId(SocketAddress address, long offset) { + this.address = address; + this.offset = offset; + } + + + public SocketAddress getAddress() { + return address; + } + + + public void setAddress(SocketAddress address) { + this.address = address; + } + + + public long getOffset() { + return offset; + } + + + public void setOffset(long offset) { + this.offset = offset; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageQueue.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageQueue.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageQueue.java new file mode 100644 index 0000000..35d2827 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageQueue.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.common.message; + +import java.io.Serializable; + + +/** + * @author shijia.wxr + */ +public class MessageQueue implements Comparable<MessageQueue>, Serializable { + private static final long serialVersionUID = 6191200464116433425L; + private String topic; + private String brokerName; + private int queueId; + + + public MessageQueue() { + + } + + + public MessageQueue(String topic, String brokerName, int queueId) { + this.topic = topic; + this.brokerName = brokerName; + this.queueId = queueId; + } + + + public String getTopic() { + return topic; + } + + + public void setTopic(String topic) { + this.topic = topic; + } + + + public String getBrokerName() { + return brokerName; + } + + + public void setBrokerName(String brokerName) { + this.brokerName = brokerName; + } + + + public int getQueueId() { + return queueId; + } + + + public void setQueueId(int queueId) { + this.queueId = queueId; + } + + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((brokerName == null) ? 0 : brokerName.hashCode()); + result = prime * result + queueId; + result = prime * result + ((topic == null) ? 0 : topic.hashCode()); + return result; + } + + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + MessageQueue other = (MessageQueue) obj; + if (brokerName == null) { + if (other.brokerName != null) + return false; + } else if (!brokerName.equals(other.brokerName)) + return false; + if (queueId != other.queueId) + return false; + if (topic == null) { + if (other.topic != null) + return false; + } else if (!topic.equals(other.topic)) + return false; + return true; + } + + + @Override + public String toString() { + return "MessageQueue [topic=" + topic + ", brokerName=" + brokerName + ", queueId=" + queueId + "]"; + } + + + @Override + public int compareTo(MessageQueue o) { + { + int result = this.topic.compareTo(o.topic); + if (result != 0) { + return result; + } + } + + { + int result = this.brokerName.compareTo(o.brokerName); + if (result != 0) { + return result; + } + } + + return this.queueId - o.queueId; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageQueueForC.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageQueueForC.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageQueueForC.java new file mode 100644 index 0000000..a905af6 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageQueueForC.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.message; + +import java.io.Serializable; + + +/** + * @author lansheng.zj + */ +public class MessageQueueForC implements Comparable<MessageQueueForC>, Serializable { + + private static final long serialVersionUID = 5320967846569962104L; + private String topic; + private String brokerName; + private int queueId; + private long offset; + + + public MessageQueueForC(String topic, String brokerName, int queueId, long offset) { + this.topic = topic; + this.brokerName = brokerName; + this.queueId = queueId; + this.offset = offset; + } + + + @Override + public int compareTo(MessageQueueForC o) { + int result = this.topic.compareTo(o.topic); + if (result != 0) { + return result; + } + result = this.brokerName.compareTo(o.brokerName); + if (result != 0) { + return result; + } + result = this.queueId - o.queueId; + if (result != 0) { + return result; + } + if ((this.offset - o.offset) > 0) { + return 1; + } else if ((this.offset - o.offset) == 0) { + return 0; + } else { + return -1; + } + } + + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((brokerName == null) ? 0 : brokerName.hashCode()); + result = prime * result + queueId; + result = prime * result + ((topic == null) ? 0 : topic.hashCode()); + return result; + } + + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + MessageQueueForC other = (MessageQueueForC) obj; + if (brokerName == null) { + if (other.brokerName != null) + return false; + } else if (!brokerName.equals(other.brokerName)) + return false; + if (queueId != other.queueId) + return false; + if (topic == null) { + if (other.topic != null) + return false; + } else if (!topic.equals(other.topic)) + return false; + + if (offset != other.offset) { + return false; + } + return true; + } + + + @Override + public String toString() { + return "MessageQueueForC [topic=" + topic + ", brokerName=" + brokerName + ", queueId=" + queueId + + ", offset=" + offset + "]"; + } + + + public String getTopic() { + return topic; + } + + + public void setTopic(String topic) { + this.topic = topic; + } + + + public String getBrokerName() { + return brokerName; + } + + + public void setBrokerName(String brokerName) { + this.brokerName = brokerName; + } + + + public int getQueueId() { + return queueId; + } + + + public void setQueueId(int queueId) { + this.queueId = queueId; + } + + + public long getOffset() { + return offset; + } + + + public void setOffset(long offset) { + this.offset = offset; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageType.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageType.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageType.java new file mode 100644 index 0000000..164eb87 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageType.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.message; + +public enum MessageType { + Normal_Msg, + Trans_Msg_Half, + Trans_msg_Commit, + Delay_Msg, +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/namesrv/NamesrvConfig.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/namesrv/NamesrvConfig.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/namesrv/NamesrvConfig.java new file mode 100644 index 0000000..08db357 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/namesrv/NamesrvConfig.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * $Id: NamesrvConfig.java 1839 2013-05-16 02:12:02Z shijia.wxr $ + */ +package com.alibaba.rocketmq.common.namesrv; + +import com.alibaba.rocketmq.common.MixAll; +import com.alibaba.rocketmq.common.constant.LoggerName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; + + +/** + * + * @author shijia.wxr + * @author lansheng.zj + */ +public class NamesrvConfig { + private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); + private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)); + + private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json"; + private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties"; + private String productEnvName = "center"; + private boolean clusterTest = false; + private boolean orderMessageEnable = false; + + public boolean isOrderMessageEnable() { + return orderMessageEnable; + } + + public void setOrderMessageEnable(boolean orderMessageEnable) { + this.orderMessageEnable = orderMessageEnable; + } + + public String getRocketmqHome() { + return rocketmqHome; + } + + + public void setRocketmqHome(String rocketmqHome) { + this.rocketmqHome = rocketmqHome; + } + + + public String getKvConfigPath() { + return kvConfigPath; + } + + + public void setKvConfigPath(String kvConfigPath) { + this.kvConfigPath = kvConfigPath; + } + + + public String getProductEnvName() { + return productEnvName; + } + + + public void setProductEnvName(String productEnvName) { + this.productEnvName = productEnvName; + } + + + public boolean isClusterTest() { + return clusterTest; + } + + + public void setClusterTest(boolean clusterTest) { + this.clusterTest = clusterTest; + } + + public String getConfigStorePath() { + return configStorePath; + } + + public void setConfigStorePath(final String configStorePath) { + this.configStorePath = configStorePath; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/namesrv/NamesrvUtil.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/namesrv/NamesrvUtil.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/namesrv/NamesrvUtil.java new file mode 100644 index 0000000..fcc32d9 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/namesrv/NamesrvUtil.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.namesrv; + +/** + * @author shijia.wxr + */ +public class NamesrvUtil { + public static final String NAMESPACE_ORDER_TOPIC_CONFIG = "ORDER_TOPIC_CONFIG"; +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/namesrv/RegisterBrokerResult.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/namesrv/RegisterBrokerResult.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/namesrv/RegisterBrokerResult.java new file mode 100644 index 0000000..68bf44a --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/namesrv/RegisterBrokerResult.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.namesrv; + +import com.alibaba.rocketmq.common.protocol.body.KVTable; + + +/** + * @author shijia.wxr + */ +public class RegisterBrokerResult { + private String haServerAddr; + private String masterAddr; + private KVTable kvTable; + + + public String getHaServerAddr() { + return haServerAddr; + } + + + public void setHaServerAddr(String haServerAddr) { + this.haServerAddr = haServerAddr; + } + + + public String getMasterAddr() { + return masterAddr; + } + + + public void setMasterAddr(String masterAddr) { + this.masterAddr = masterAddr; + } + + + public KVTable getKvTable() { + return kvTable; + } + + + public void setKvTable(KVTable kvTable) { + this.kvTable = kvTable; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/namesrv/TopAddressing.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/namesrv/TopAddressing.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/namesrv/TopAddressing.java new file mode 100644 index 0000000..2e4ad87 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/namesrv/TopAddressing.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * $Id: TopAddressing.java 1831 2013-05-16 01:39:51Z shijia.wxr $ + */ +package com.alibaba.rocketmq.common.namesrv; + +import com.alibaba.rocketmq.common.MixAll; +import com.alibaba.rocketmq.common.UtilAll; +import com.alibaba.rocketmq.common.constant.LoggerName; +import com.alibaba.rocketmq.common.help.FAQUrl; +import com.alibaba.rocketmq.common.utils.HttpTinyClient; +import com.alibaba.rocketmq.common.utils.HttpTinyClient.HttpResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + + +/** + * @author shijia.wxr + * @author manhong.yqd + */ +public class TopAddressing { + private static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); + private String nsAddr; + private String wsAddr; + private String unitName; + + + public TopAddressing(final String wsAddr) { + this(wsAddr, null); + } + + + public TopAddressing(final String wsAddr, final String unitName) { + this.wsAddr = wsAddr; + this.unitName = unitName; + } + + public final String fetchNSAddr() { + return fetchNSAddr(true, 3000); + } + + public final String fetchNSAddr(boolean verbose, long timeoutMills) { + String url = this.wsAddr; + try { + if (!UtilAll.isBlank(this.unitName)) { + url = url + "-" + this.unitName + "?nofix=1"; + } + HttpResult result = HttpTinyClient.httpGet(url, null, null, "UTF-8", timeoutMills); + if (200 == result.code) { + String responseStr = result.content; + if (responseStr != null) { + return clearNewLine(responseStr); + } else { + log.error("fetch nameserver address is null"); + } + } else { + log.error("fetch nameserver address failed. statusCode={}", result.code); + } + } catch (IOException e) { + if (verbose) { + log.error("fetch name server address exception", e); + } + } + + if (verbose) { + String errorMsg = + "connect to " + url + " failed, maybe the domain name " + MixAll.WS_DOMAIN_NAME + " not bind in /etc/hosts"; + errorMsg += FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL); + + log.warn(errorMsg); + } + return null; + } + + private static String clearNewLine(final String str) { + String newString = str.trim(); + int index = newString.indexOf("\r"); + if (index != -1) { + return newString.substring(0, index); + } + + index = newString.indexOf("\n"); + if (index != -1) { + return newString.substring(0, index); + } + + return newString; + } + + public String getNsAddr() { + return nsAddr; + } + + + public void setNsAddr(String nsAddr) { + this.nsAddr = nsAddr; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/MQProtosHelper.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/MQProtosHelper.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/MQProtosHelper.java new file mode 100644 index 0000000..aaaa51d --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/MQProtosHelper.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.protocol; + +import com.alibaba.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader; +import com.alibaba.rocketmq.remoting.common.RemotingHelper; +import com.alibaba.rocketmq.remoting.exception.RemotingConnectException; +import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException; +import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException; +import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; + + +/** + * @author shijia.wxr + */ +public class MQProtosHelper { + public static boolean registerBrokerToNameServer(final String nsaddr, final String brokerAddr, + final long timeoutMillis) { + RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader(); + requestHeader.setBrokerAddr(brokerAddr); + + RemotingCommand request = + RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader); + + try { + RemotingCommand response = RemotingHelper.invokeSync(nsaddr, request, timeoutMillis); + if (response != null) { + return ResponseCode.SUCCESS == response.getCode(); + } + } catch (RemotingConnectException e) { + e.printStackTrace(); + } catch (RemotingSendRequestException e) { + e.printStackTrace(); + } catch (RemotingTimeoutException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/RequestCode.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/RequestCode.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/RequestCode.java new file mode 100644 index 0000000..a8b8698 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/RequestCode.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.protocol; + +public class RequestCode { + + public static final int SEND_MESSAGE = 10; + + public static final int PULL_MESSAGE = 11; + + public static final int QUERY_MESSAGE = 12; + public static final int QUERY_BROKER_OFFSET = 13; + public static final int QUERY_CONSUMER_OFFSET = 14; + public static final int UPDATE_CONSUMER_OFFSET = 15; + public static final int UPDATE_AND_CREATE_TOPIC = 17; + public static final int GET_ALL_TOPIC_CONFIG = 21; + public static final int GET_TOPIC_CONFIG_LIST = 22; + + public static final int GET_TOPIC_NAME_LIST = 23; + + public static final int UPDATE_BROKER_CONFIG = 25; + + public static final int GET_BROKER_CONFIG = 26; + + public static final int TRIGGER_DELETE_FILES = 27; + + public static final int GET_BROKER_RUNTIME_INFO = 28; + public static final int SEARCH_OFFSET_BY_TIMESTAMP = 29; + public static final int GET_MAX_OFFSET = 30; + public static final int GET_MIN_OFFSET = 31; + + public static final int GET_EARLIEST_MSG_STORETIME = 32; + + public static final int VIEW_MESSAGE_BY_ID = 33; + + public static final int HEART_BEAT = 34; + + public static final int UNREGISTER_CLIENT = 35; + + public static final int CONSUMER_SEND_MSG_BACK = 36; + + public static final int END_TRANSACTION = 37; + public static final int GET_CONSUMER_LIST_BY_GROUP = 38; + + public static final int CHECK_TRANSACTION_STATE = 39; + + public static final int NOTIFY_CONSUMER_IDS_CHANGED = 40; + + public static final int LOCK_BATCH_MQ = 41; + + public static final int UNLOCK_BATCH_MQ = 42; + public static final int GET_ALL_CONSUMER_OFFSET = 43; + + public static final int GET_ALL_DELAY_OFFSET = 45; + + public static final int PUT_KV_CONFIG = 100; + + public static final int GET_KV_CONFIG = 101; + + public static final int DELETE_KV_CONFIG = 102; + + public static final int REGISTER_BROKER = 103; + + public static final int UNREGISTER_BROKER = 104; + public static final int GET_ROUTEINTO_BY_TOPIC = 105; + + public static final int GET_BROKER_CLUSTER_INFO = 106; + public static final int UPDATE_AND_CREATE_SUBSCRIPTIONGROUP = 200; + public static final int GET_ALL_SUBSCRIPTIONGROUP_CONFIG = 201; + public static final int GET_TOPIC_STATS_INFO = 202; + public static final int GET_CONSUMER_CONNECTION_LIST = 203; + public static final int GET_PRODUCER_CONNECTION_LIST = 204; + public static final int WIPE_WRITE_PERM_OF_BROKER = 205; + + + public static final int GET_ALL_TOPIC_LIST_FROM_NAMESERVER = 206; + + public static final int DELETE_SUBSCRIPTIONGROUP = 207; + public static final int GET_CONSUME_STATS = 208; + + public static final int SUSPEND_CONSUMER = 209; + + public static final int RESUME_CONSUMER = 210; + public static final int RESET_CONSUMER_OFFSET_IN_CONSUMER = 211; + public static final int RESET_CONSUMER_OFFSET_IN_BROKER = 212; + + public static final int ADJUST_CONSUMER_THREAD_POOL = 213; + + public static final int WHO_CONSUME_THE_MESSAGE = 214; + + + public static final int DELETE_TOPIC_IN_BROKER = 215; + + public static final int DELETE_TOPIC_IN_NAMESRV = 216; + public static final int GET_KVLIST_BY_NAMESPACE = 219; + + + public static final int RESET_CONSUMER_CLIENT_OFFSET = 220; + + public static final int GET_CONSUMER_STATUS_FROM_CLIENT = 221; + + public static final int INVOKE_BROKER_TO_RESET_OFFSET = 222; + + public static final int INVOKE_BROKER_TO_GET_CONSUMER_STATUS = 223; + + + public static final int QUERY_TOPIC_CONSUME_BY_WHO = 300; + + public static final int GET_TOPICS_BY_CLUSTER = 224; + + public static final int REGISTER_FILTER_SERVER = 301; + public static final int REGISTER_MESSAGE_FILTER_CLASS = 302; + + public static final int QUERY_CONSUME_TIME_SPAN = 303; + + public static final int GET_SYSTEM_TOPIC_LIST_FROM_NS = 304; + public static final int GET_SYSTEM_TOPIC_LIST_FROM_BROKER = 305; + + public static final int CLEAN_EXPIRED_CONSUMEQUEUE = 306; + + public static final int GET_CONSUMER_RUNNING_INFO = 307; + + public static final int QUERY_CORRECTION_OFFSET = 308; + public static final int CONSUME_MESSAGE_DIRECTLY = 309; + + public static final int SEND_MESSAGE_V2 = 310; + + public static final int GET_UNIT_TOPIC_LIST = 311; + + public static final int GET_HAS_UNIT_SUB_TOPIC_LIST = 312; + + public static final int GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST = 313; + + public static final int CLONE_GROUP_OFFSET = 314; + + public static final int VIEW_BROKER_STATS_DATA = 315; + + public static final int CLEAN_UNUSED_TOPIC = 316; + + public static final int GET_BROKER_CONSUME_STATS = 317; + + /** + * update the config of name server + */ + public static final int UPDATE_NAMESRV_CONFIG = 318; + + /** + * get config from name server + */ + public static final int GET_NAMESRV_CONFIG = 319; +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/ResponseCode.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/ResponseCode.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/ResponseCode.java new file mode 100644 index 0000000..3c01fad --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/ResponseCode.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.protocol; + +import com.alibaba.rocketmq.remoting.protocol.RemotingSysResponseCode; + + +public class ResponseCode extends RemotingSysResponseCode { + + public static final int FLUSH_DISK_TIMEOUT = 10; + + public static final int SLAVE_NOT_AVAILABLE = 11; + + public static final int FLUSH_SLAVE_TIMEOUT = 12; + + public static final int MESSAGE_ILLEGAL = 13; + + public static final int SERVICE_NOT_AVAILABLE = 14; + + public static final int VERSION_NOT_SUPPORTED = 15; + + public static final int NO_PERMISSION = 16; + + public static final int TOPIC_NOT_EXIST = 17; + public static final int TOPIC_EXIST_ALREADY = 18; + public static final int PULL_NOT_FOUND = 19; + + public static final int PULL_RETRY_IMMEDIATELY = 20; + + public static final int PULL_OFFSET_MOVED = 21; + + public static final int QUERY_NOT_FOUND = 22; + + public static final int SUBSCRIPTION_PARSE_FAILED = 23; + + public static final int SUBSCRIPTION_NOT_EXIST = 24; + + public static final int SUBSCRIPTION_NOT_LATEST = 25; + + public static final int SUBSCRIPTION_GROUP_NOT_EXIST = 26; + + public static final int TRANSACTION_SHOULD_COMMIT = 200; + + public static final int TRANSACTION_SHOULD_ROLLBACK = 201; + + public static final int TRANSACTION_STATE_UNKNOW = 202; + + public static final int TRANSACTION_STATE_GROUP_WRONG = 203; + public static final int NO_BUYER_ID = 204; + + + public static final int NOT_IN_CURRENT_UNIT = 205; + + + public static final int CONSUMER_NOT_ONLINE = 206; + + + public static final int CONSUME_MSG_TIMEOUT = 207; + + + public static final int NO_MESSAGE = 208; +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/BrokerStatsData.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/BrokerStatsData.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/BrokerStatsData.java new file mode 100644 index 0000000..6f51b06 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/BrokerStatsData.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.protocol.body; + +import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable; + + +public class BrokerStatsData extends RemotingSerializable { + + private BrokerStatsItem statsMinute; + + private BrokerStatsItem statsHour; + + private BrokerStatsItem statsDay; + + + public BrokerStatsItem getStatsMinute() { + return statsMinute; + } + + + public void setStatsMinute(BrokerStatsItem statsMinute) { + this.statsMinute = statsMinute; + } + + + public BrokerStatsItem getStatsHour() { + return statsHour; + } + + + public void setStatsHour(BrokerStatsItem statsHour) { + this.statsHour = statsHour; + } + + + public BrokerStatsItem getStatsDay() { + return statsDay; + } + + + public void setStatsDay(BrokerStatsItem statsDay) { + this.statsDay = statsDay; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/BrokerStatsItem.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/BrokerStatsItem.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/BrokerStatsItem.java new file mode 100644 index 0000000..1cf6c3d --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/BrokerStatsItem.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.protocol.body; + +public class BrokerStatsItem { + private long sum; + private double tps; + private double avgpt; + + + public long getSum() { + return sum; + } + + + public void setSum(long sum) { + this.sum = sum; + } + + + public double getTps() { + return tps; + } + + + public void setTps(double tps) { + this.tps = tps; + } + + + public double getAvgpt() { + return avgpt; + } + + + public void setAvgpt(double avgpt) { + this.avgpt = avgpt; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/CMResult.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/CMResult.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/CMResult.java new file mode 100644 index 0000000..873b548 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/CMResult.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.protocol.body; + +public enum CMResult { + CR_SUCCESS, + CR_LATER, + CR_ROLLBACK, + CR_COMMIT, + CR_THROW_EXCEPTION, + CR_RETURN_NULL, +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ClusterInfo.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ClusterInfo.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ClusterInfo.java new file mode 100644 index 0000000..81d6447 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ClusterInfo.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.protocol.body; + +import com.alibaba.rocketmq.common.protocol.route.BrokerData; +import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Set; + + +/** + * @author shijia.wxr + */ +public class ClusterInfo extends RemotingSerializable { + private HashMap<String/* brokerName */, BrokerData> brokerAddrTable; + private HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable; + + + public HashMap<String, BrokerData> getBrokerAddrTable() { + return brokerAddrTable; + } + + + public void setBrokerAddrTable(HashMap<String, BrokerData> brokerAddrTable) { + this.brokerAddrTable = brokerAddrTable; + } + + + public HashMap<String, Set<String>> getClusterAddrTable() { + return clusterAddrTable; + } + + + public void setClusterAddrTable(HashMap<String, Set<String>> clusterAddrTable) { + this.clusterAddrTable = clusterAddrTable; + } + + + public String[] retrieveAllAddrByCluster(String cluster) { + List<String> addrs = new ArrayList<String>(); + if (clusterAddrTable.containsKey(cluster)) { + Set<String> brokerNames = clusterAddrTable.get(cluster); + for (String brokerName : brokerNames) { + BrokerData brokerData = brokerAddrTable.get(brokerName); + if (null != brokerData) { + addrs.addAll(brokerData.getBrokerAddrs().values()); + } + } + } + + return addrs.toArray(new String[]{}); + } + + + public String[] retrieveAllClusterNames() { + return clusterAddrTable.keySet().toArray(new String[]{}); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/Connection.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/Connection.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/Connection.java new file mode 100644 index 0000000..72cf601 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/Connection.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.protocol.body; + +import com.alibaba.rocketmq.remoting.protocol.LanguageCode; + + +/** + * @author shijia.wxr + */ +public class Connection { + private String clientId; + private String clientAddr; + private LanguageCode language; + private int version; + + + public String getClientId() { + return clientId; + } + + + public void setClientId(String clientId) { + this.clientId = clientId; + } + + + public String getClientAddr() { + return clientAddr; + } + + + public void setClientAddr(String clientAddr) { + this.clientAddr = clientAddr; + } + + + public LanguageCode getLanguage() { + return language; + } + + + public void setLanguage(LanguageCode language) { + this.language = language; + } + + + public int getVersion() { + return version; + } + + + public void setVersion(int version) { + this.version = version; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeByWho.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeByWho.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeByWho.java new file mode 100644 index 0000000..8a69352 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeByWho.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.common.protocol.body; + +import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable; + +import java.util.HashSet; + + +/** + * @author shijia.wxr + * + */ +public class ConsumeByWho extends RemotingSerializable { + private HashSet<String> consumedGroup = new HashSet<String>(); + private HashSet<String> notConsumedGroup = new HashSet<String>(); + private String topic; + private int queueId; + private long offset; + + + public HashSet<String> getConsumedGroup() { + return consumedGroup; + } + + + public void setConsumedGroup(HashSet<String> consumedGroup) { + this.consumedGroup = consumedGroup; + } + + + public HashSet<String> getNotConsumedGroup() { + return notConsumedGroup; + } + + + public void setNotConsumedGroup(HashSet<String> notConsumedGroup) { + this.notConsumedGroup = notConsumedGroup; + } + + + public String getTopic() { + return topic; + } + + + public void setTopic(String topic) { + this.topic = topic; + } + + + public int getQueueId() { + return queueId; + } + + + public void setQueueId(int queueId) { + this.queueId = queueId; + } + + + public long getOffset() { + return offset; + } + + + public void setOffset(long offset) { + this.offset = offset; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeMessageDirectlyResult.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeMessageDirectlyResult.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeMessageDirectlyResult.java new file mode 100644 index 0000000..c895fe2 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeMessageDirectlyResult.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.protocol.body; + +import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable; + + +public class ConsumeMessageDirectlyResult extends RemotingSerializable { + private boolean order = false; + private boolean autoCommit = true; + private CMResult consumeResult; + private String remark; + private long spentTimeMills; + + + public boolean isOrder() { + return order; + } + + + public void setOrder(boolean order) { + this.order = order; + } + + + public boolean isAutoCommit() { + return autoCommit; + } + + + public void setAutoCommit(boolean autoCommit) { + this.autoCommit = autoCommit; + } + + + public String getRemark() { + return remark; + } + + + public void setRemark(String remark) { + this.remark = remark; + } + + + public CMResult getConsumeResult() { + return consumeResult; + } + + + public void setConsumeResult(CMResult consumeResult) { + this.consumeResult = consumeResult; + } + + + public long getSpentTimeMills() { + return spentTimeMills; + } + + + public void setSpentTimeMills(long spentTimeMills) { + this.spentTimeMills = spentTimeMills; + } + + + @Override + public String toString() { + return "ConsumeMessageDirectlyResult [order=" + order + ", autoCommit=" + autoCommit + + ", consumeResult=" + consumeResult + ", remark=" + remark + ", spentTimeMills=" + + spentTimeMills + "]"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeStatsList.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeStatsList.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeStatsList.java new file mode 100644 index 0000000..a1c608d --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeStatsList.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.common.protocol.body; + +import com.alibaba.rocketmq.common.admin.ConsumeStats; +import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + + +/** + * @author shijia.wxr + */ +public class ConsumeStatsList extends RemotingSerializable { + private List<Map<String/*subscriptionGroupName*/, List<ConsumeStats>>> consumeStatsList = new ArrayList<Map<String/*subscriptionGroupName*/, List<ConsumeStats>>>(); + private String brokerAddr; + private long totalDiff; + + public List<Map<String, List<ConsumeStats>>> getConsumeStatsList() { + return consumeStatsList; + } + + public void setConsumeStatsList(List<Map<String, List<ConsumeStats>>> consumeStatsList) { + this.consumeStatsList = consumeStatsList; + } + + public String getBrokerAddr() { + return brokerAddr; + } + + public void setBrokerAddr(String brokerAddr) { + this.brokerAddr = brokerAddr; + } + + public long getTotalDiff() { + return totalDiff; + } + + public void setTotalDiff(long totalDiff) { + this.totalDiff = totalDiff; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeStatus.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeStatus.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeStatus.java new file mode 100644 index 0000000..dcb6281 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeStatus.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.protocol.body; + +public class ConsumeStatus { + private double pullRT; + private double pullTPS; + private double consumeRT; + private double consumeOKTPS; + private double consumeFailedTPS; + + private long consumeFailedMsgs; + + + public double getPullRT() { + return pullRT; + } + + + public void setPullRT(double pullRT) { + this.pullRT = pullRT; + } + + + public double getPullTPS() { + return pullTPS; + } + + + public void setPullTPS(double pullTPS) { + this.pullTPS = pullTPS; + } + + + public double getConsumeRT() { + return consumeRT; + } + + + public void setConsumeRT(double consumeRT) { + this.consumeRT = consumeRT; + } + + + public double getConsumeOKTPS() { + return consumeOKTPS; + } + + + public void setConsumeOKTPS(double consumeOKTPS) { + this.consumeOKTPS = consumeOKTPS; + } + + + public double getConsumeFailedTPS() { + return consumeFailedTPS; + } + + + public void setConsumeFailedTPS(double consumeFailedTPS) { + this.consumeFailedTPS = consumeFailedTPS; + } + + + public long getConsumeFailedMsgs() { + return consumeFailedMsgs; + } + + + public void setConsumeFailedMsgs(long consumeFailedMsgs) { + this.consumeFailedMsgs = consumeFailedMsgs; + } +}
