http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/message/MessageConst.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/alibaba/rocketmq/common/message/MessageConst.java b/common/src/main/java/com/alibaba/rocketmq/common/message/MessageConst.java deleted file mode 100644 index fc06d6e..0000000 --- a/common/src/main/java/com/alibaba/rocketmq/common/message/MessageConst.java +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.common.message; - -import java.util.HashSet; - - -public class MessageConst { - public static final String PROPERTY_KEYS = "KEYS"; - public static final String PROPERTY_TAGS = "TAGS"; - public static final String PROPERTY_WAIT_STORE_MSG_OK = "WAIT"; - public static final String PROPERTY_DELAY_TIME_LEVEL = "DELAY"; - public static final String PROPERTY_RETRY_TOPIC = "RETRY_TOPIC"; - public static final String PROPERTY_REAL_TOPIC = "REAL_TOPIC"; - public static final String PROPERTY_REAL_QUEUE_ID = "REAL_QID"; - public static final String PROPERTY_TRANSACTION_PREPARED = "TRAN_MSG"; - public static final String PROPERTY_PRODUCER_GROUP = "PGROUP"; - public static final String PROPERTY_MIN_OFFSET = "MIN_OFFSET"; - public static final String PROPERTY_MAX_OFFSET = "MAX_OFFSET"; - public static final String PROPERTY_BUYER_ID = "BUYER_ID"; - public static final String PROPERTY_ORIGIN_MESSAGE_ID = "ORIGIN_MESSAGE_ID"; - public static final String PROPERTY_TRANSFER_FLAG = "TRANSFER_FLAG"; - public static final String PROPERTY_CORRECTION_FLAG = "CORRECTION_FLAG"; - public static final String PROPERTY_MQ2_FLAG = "MQ2_FLAG"; - public static final String PROPERTY_RECONSUME_TIME = "RECONSUME_TIME"; - public static final String PROPERTY_MSG_REGION = "MSG_REGION"; - public static final String PROPERTY_TRACE_SWITCH = "TRACE_ON"; - public static final String PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX = "UNIQ_KEY"; - public static final String PROPERTY_MAX_RECONSUME_TIMES = "MAX_RECONSUME_TIMES"; - public static final String PROPERTY_CONSUME_START_TIMESTAMP = "CONSUME_START_TIME"; - - public static final String KEY_SEPARATOR = " "; - - public static final HashSet<String> STRING_HASH_SET = new HashSet<String>(); - - - static { - STRING_HASH_SET.add(PROPERTY_TRACE_SWITCH); - STRING_HASH_SET.add(PROPERTY_MSG_REGION); - STRING_HASH_SET.add(PROPERTY_KEYS); - STRING_HASH_SET.add(PROPERTY_TAGS); - STRING_HASH_SET.add(PROPERTY_WAIT_STORE_MSG_OK); - STRING_HASH_SET.add(PROPERTY_DELAY_TIME_LEVEL); - STRING_HASH_SET.add(PROPERTY_RETRY_TOPIC); - STRING_HASH_SET.add(PROPERTY_REAL_TOPIC); - STRING_HASH_SET.add(PROPERTY_REAL_QUEUE_ID); - STRING_HASH_SET.add(PROPERTY_TRANSACTION_PREPARED); - STRING_HASH_SET.add(PROPERTY_PRODUCER_GROUP); - STRING_HASH_SET.add(PROPERTY_MIN_OFFSET); - STRING_HASH_SET.add(PROPERTY_MAX_OFFSET); - STRING_HASH_SET.add(PROPERTY_BUYER_ID); - STRING_HASH_SET.add(PROPERTY_ORIGIN_MESSAGE_ID); - STRING_HASH_SET.add(PROPERTY_TRANSFER_FLAG); - STRING_HASH_SET.add(PROPERTY_CORRECTION_FLAG); - STRING_HASH_SET.add(PROPERTY_MQ2_FLAG); - STRING_HASH_SET.add(PROPERTY_RECONSUME_TIME); - STRING_HASH_SET.add(PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); - STRING_HASH_SET.add(PROPERTY_MAX_RECONSUME_TIMES); - STRING_HASH_SET.add(PROPERTY_CONSUME_START_TIMESTAMP); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/message/MessageDecoder.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/alibaba/rocketmq/common/message/MessageDecoder.java b/common/src/main/java/com/alibaba/rocketmq/common/message/MessageDecoder.java deleted file mode 100644 index e21c1ca..0000000 --- a/common/src/main/java/com/alibaba/rocketmq/common/message/MessageDecoder.java +++ /dev/null @@ -1,395 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.common.message; - -import com.alibaba.rocketmq.common.UtilAll; -import com.alibaba.rocketmq.common.sysflag.MessageSysFlag; - -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.net.UnknownHostException; -import java.nio.BufferUnderflowException; -import java.nio.ByteBuffer; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - - -/** - * @author shijia.wxr - */ -public class MessageDecoder { - public final static int MSG_ID_LENGTH = 8 + 8; - - public final static Charset CHARSET_UTF8 = Charset.forName("UTF-8"); - public final static int MESSAGE_MAGIC_CODE_POSTION = 4; - public final static int MESSAGE_FLAG_POSTION = 16; - public final static int MESSAGE_PHYSIC_OFFSET_POSTION = 28; - public final static int MESSAGE_STORE_TIMESTAMP_POSTION = 56; - public final static int MESSAGE_MAGIC_CODE = 0xAABBCCDD ^ 1880681586 + 8; - - - public static String createMessageId(final ByteBuffer input, final ByteBuffer addr, final long offset) { - input.flip(); - input.limit(MessageDecoder.MSG_ID_LENGTH); - - input.put(addr); - input.putLong(offset); - - return UtilAll.bytes2string(input.array()); - } - - - public static String createMessageId(SocketAddress socketAddress, long transactionIdhashCode) { - ByteBuffer byteBuffer = ByteBuffer.allocate(MessageDecoder.MSG_ID_LENGTH); - InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress; - byteBuffer.put(inetSocketAddress.getAddress().getAddress()); - byteBuffer.putInt(inetSocketAddress.getPort()); - byteBuffer.putLong(transactionIdhashCode); - byteBuffer.flip(); - return UtilAll.bytes2string(byteBuffer.array()); - } - - - public static MessageId decodeMessageId(final String msgId) throws UnknownHostException { - SocketAddress address; - long offset; - - - byte[] ip = UtilAll.string2bytes(msgId.substring(0, 8)); - byte[] port = UtilAll.string2bytes(msgId.substring(8, 16)); - ByteBuffer bb = ByteBuffer.wrap(port); - int portInt = bb.getInt(0); - address = new InetSocketAddress(InetAddress.getByAddress(ip), portInt); - - // offset - byte[] data = UtilAll.string2bytes(msgId.substring(16, 32)); - bb = ByteBuffer.wrap(data); - offset = bb.getLong(0); - - return new MessageId(address, offset); - } - - - public static MessageExt decode(java.nio.ByteBuffer byteBuffer) { - return decode(byteBuffer, true, true, false); - } - - public static MessageExt clientDecode(java.nio.ByteBuffer byteBuffer, final boolean readBody) { - return decode(byteBuffer, readBody, true, true); - } - - public static MessageExt decode(java.nio.ByteBuffer byteBuffer, final boolean readBody) { - return decode(byteBuffer, readBody, true, false); - } - - - public static byte[] encode(MessageExt messageExt, boolean needCompress) throws Exception { - byte[] body = messageExt.getBody(); - byte[] topics = messageExt.getTopic().getBytes(CHARSET_UTF8); - byte topicLen = (byte) topics.length; - String properties = messageProperties2String(messageExt.getProperties()); - byte[] propertiesBytes = properties.getBytes(CHARSET_UTF8); - short propertiesLength = (short) propertiesBytes.length; - int sysFlag = messageExt.getSysFlag(); - byte[] newBody = messageExt.getBody(); - if (needCompress && (sysFlag & MessageSysFlag.COMPRESSED_FLAG) == MessageSysFlag.COMPRESSED_FLAG) { - newBody = UtilAll.compress(body, 5); - } - int bodyLength = newBody.length; - int storeSize = messageExt.getStoreSize(); - ByteBuffer byteBuffer; - if (storeSize > 0) { - byteBuffer = ByteBuffer.allocate(storeSize); - } else { - storeSize = 4 // 1 TOTALSIZE - + 4 // 2 MAGICCODE - + 4 // 3 BODYCRC - + 4 // 4 QUEUEID - + 4 // 5 FLAG - + 8 // 6 QUEUEOFFSET - + 8 // 7 PHYSICALOFFSET - + 4 // 8 SYSFLAG - + 8 // 9 BORNTIMESTAMP - + 8 // 10 BORNHOST - + 8 // 11 STORETIMESTAMP - + 8 // 12 STOREHOSTADDRESS - + 4 // 13 RECONSUMETIMES - + 8 // 14 Prepared Transaction Offset - + 4 + bodyLength // 14 BODY - + 1 + topicLen // 15 TOPIC - + 2 + propertiesLength // 16 propertiesLength - + 0; - byteBuffer = ByteBuffer.allocate(storeSize); - } - // 1 TOTALSIZE - byteBuffer.putInt(storeSize); - - // 2 MAGICCODE - byteBuffer.putInt(MESSAGE_MAGIC_CODE); - - // 3 BODYCRC - int bodyCRC = messageExt.getBodyCRC(); - byteBuffer.putInt(bodyCRC); - - // 4 QUEUEID - int queueId = messageExt.getQueueId(); - byteBuffer.putInt(queueId); - - // 5 FLAG - int flag = messageExt.getFlag(); - byteBuffer.putInt(flag); - - // 6 QUEUEOFFSET - long queueOffset = messageExt.getQueueOffset(); - byteBuffer.putLong(queueOffset); - - // 7 PHYSICALOFFSET - long physicOffset = messageExt.getCommitLogOffset(); - byteBuffer.putLong(physicOffset); - - // 8 SYSFLAG - byteBuffer.putInt(sysFlag); - - // 9 BORNTIMESTAMP - long bornTimeStamp = messageExt.getBornTimestamp(); - byteBuffer.putLong(bornTimeStamp); - - // 10 BORNHOST - InetSocketAddress bornHost = (InetSocketAddress) messageExt.getBornHost(); - byteBuffer.put(bornHost.getAddress().getAddress()); - byteBuffer.putInt(bornHost.getPort()); - - // 11 STORETIMESTAMP - long storeTimestamp = messageExt.getStoreTimestamp(); - byteBuffer.putLong(storeTimestamp); - - // 12 STOREHOST - InetSocketAddress serverHost = (InetSocketAddress) messageExt.getStoreHost(); - byteBuffer.put(serverHost.getAddress().getAddress()); - byteBuffer.putInt(serverHost.getPort()); - - // 13 RECONSUMETIMES - int reconsumeTimes = messageExt.getReconsumeTimes(); - byteBuffer.putInt(reconsumeTimes); - - // 14 Prepared Transaction Offset - long preparedTransactionOffset = messageExt.getPreparedTransactionOffset(); - byteBuffer.putLong(preparedTransactionOffset); - - // 15 BODY - byteBuffer.putInt(bodyLength); - byteBuffer.put(newBody); - - // 16 TOPIC - byteBuffer.put(topicLen); - byteBuffer.put(topics); - - // 17 properties - byteBuffer.putShort(propertiesLength); - byteBuffer.put(propertiesBytes); - - return byteBuffer.array(); - } - - public static MessageExt decode( - java.nio.ByteBuffer byteBuffer, final boolean readBody, final boolean deCompressBody) { - return decode(byteBuffer, readBody, deCompressBody, false); - } - - public static MessageExt decode( - java.nio.ByteBuffer byteBuffer, final boolean readBody, final boolean deCompressBody, final boolean isClient) { - try { - - MessageExt msgExt; - if (isClient) { - msgExt = new MessageClientExt(); - } else { - msgExt = new MessageExt(); - } - - // 1 TOTALSIZE - int storeSize = byteBuffer.getInt(); - msgExt.setStoreSize(storeSize); - - // 2 MAGICCODE - byteBuffer.getInt(); - - // 3 BODYCRC - int bodyCRC = byteBuffer.getInt(); - msgExt.setBodyCRC(bodyCRC); - - // 4 QUEUEID - int queueId = byteBuffer.getInt(); - msgExt.setQueueId(queueId); - - // 5 FLAG - int flag = byteBuffer.getInt(); - msgExt.setFlag(flag); - - // 6 QUEUEOFFSET - long queueOffset = byteBuffer.getLong(); - msgExt.setQueueOffset(queueOffset); - - // 7 PHYSICALOFFSET - long physicOffset = byteBuffer.getLong(); - msgExt.setCommitLogOffset(physicOffset); - - // 8 SYSFLAG - int sysFlag = byteBuffer.getInt(); - msgExt.setSysFlag(sysFlag); - - // 9 BORNTIMESTAMP - long bornTimeStamp = byteBuffer.getLong(); - msgExt.setBornTimestamp(bornTimeStamp); - - // 10 BORNHOST - byte[] bornHost = new byte[4]; - byteBuffer.get(bornHost, 0, 4); - int port = byteBuffer.getInt(); - msgExt.setBornHost(new InetSocketAddress(InetAddress.getByAddress(bornHost), port)); - - // 11 STORETIMESTAMP - long storeTimestamp = byteBuffer.getLong(); - msgExt.setStoreTimestamp(storeTimestamp); - - // 12 STOREHOST - byte[] storeHost = new byte[4]; - byteBuffer.get(storeHost, 0, 4); - port = byteBuffer.getInt(); - msgExt.setStoreHost(new InetSocketAddress(InetAddress.getByAddress(storeHost), port)); - - // 13 RECONSUMETIMES - int reconsumeTimes = byteBuffer.getInt(); - msgExt.setReconsumeTimes(reconsumeTimes); - - // 14 Prepared Transaction Offset - long preparedTransactionOffset = byteBuffer.getLong(); - msgExt.setPreparedTransactionOffset(preparedTransactionOffset); - - // 15 BODY - int bodyLen = byteBuffer.getInt(); - if (bodyLen > 0) { - if (readBody) { - byte[] body = new byte[bodyLen]; - byteBuffer.get(body); - - // uncompress body - if (deCompressBody && (sysFlag & MessageSysFlag.COMPRESSED_FLAG) == MessageSysFlag.COMPRESSED_FLAG) { - body = UtilAll.uncompress(body); - } - - msgExt.setBody(body); - } else { - byteBuffer.position(byteBuffer.position() + bodyLen); - } - } - - // 16 TOPIC - byte topicLen = byteBuffer.get(); - byte[] topic = new byte[(int) topicLen]; - byteBuffer.get(topic); - msgExt.setTopic(new String(topic, CHARSET_UTF8)); - - // 17 properties - short propertiesLength = byteBuffer.getShort(); - if (propertiesLength > 0) { - byte[] properties = new byte[propertiesLength]; - byteBuffer.get(properties); - String propertiesString = new String(properties, CHARSET_UTF8); - Map<String, String> map = string2messageProperties(propertiesString); - msgExt.setProperties(map); - } - - ByteBuffer byteBufferMsgId = ByteBuffer.allocate(MSG_ID_LENGTH); - String msgId = createMessageId(byteBufferMsgId, msgExt.getStoreHostBytes(), msgExt.getCommitLogOffset()); - msgExt.setMsgId(msgId); - - if (isClient) { - ((MessageClientExt) msgExt).setOffsetMsgId(msgId); - } - - return msgExt; - } catch (UnknownHostException e) { - byteBuffer.position(byteBuffer.limit()); - } catch (BufferUnderflowException e) { - byteBuffer.position(byteBuffer.limit()); - } catch (Exception e) { - byteBuffer.position(byteBuffer.limit()); - } - - return null; - } - - - public static List<MessageExt> decodes(java.nio.ByteBuffer byteBuffer) { - return decodes(byteBuffer, true); - } - - public static List<MessageExt> decodes(java.nio.ByteBuffer byteBuffer, final boolean readBody) { - List<MessageExt> msgExts = new ArrayList<MessageExt>(); - while (byteBuffer.hasRemaining()) { - MessageExt msgExt = clientDecode(byteBuffer, readBody); - if (null != msgExt) { - msgExts.add(msgExt); - } else { - break; - } - } - return msgExts; - } - - public static final char NAME_VALUE_SEPARATOR = 1; - public static final char PROPERTY_SEPARATOR = 2; - - - public static String messageProperties2String(Map<String, String> properties) { - StringBuilder sb = new StringBuilder(); - if (properties != null) { - for (final Map.Entry<String, String> entry : properties.entrySet()) { - final String name = entry.getKey(); - final String value = entry.getValue(); - - sb.append(name); - sb.append(NAME_VALUE_SEPARATOR); - sb.append(value); - sb.append(PROPERTY_SEPARATOR); - } - } - return sb.toString(); - } - - public static Map<String, String> string2messageProperties(final String properties) { - Map<String, String> map = new HashMap<String, String>(); - if (properties != null) { - String[] items = properties.split(String.valueOf(PROPERTY_SEPARATOR)); - if (items != null) { - for (String i : items) { - String[] nv = i.split(String.valueOf(NAME_VALUE_SEPARATOR)); - if (nv != null && 2 == nv.length) { - map.put(nv[0], nv[1]); - } - } - } - } - - return map; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/message/MessageExt.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/alibaba/rocketmq/common/message/MessageExt.java b/common/src/main/java/com/alibaba/rocketmq/common/message/MessageExt.java deleted file mode 100644 index 627935d..0000000 --- a/common/src/main/java/com/alibaba/rocketmq/common/message/MessageExt.java +++ /dev/null @@ -1,238 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.common.message; - -import com.alibaba.rocketmq.common.TopicFilterType; -import com.alibaba.rocketmq.common.sysflag.MessageSysFlag; - -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.nio.ByteBuffer; - - -/** - * @author shijia.wxr - */ -public class MessageExt extends Message { - private static final long serialVersionUID = 5720810158625748049L; - - private int queueId; - - private int storeSize; - - private long queueOffset; - private int sysFlag; - private long bornTimestamp; - private SocketAddress bornHost; - - private long storeTimestamp; - private SocketAddress storeHost; - private String msgId; - private long commitLogOffset; - private int bodyCRC; - private int reconsumeTimes; - - private long preparedTransactionOffset; - - - public MessageExt() { - } - - - public MessageExt(int queueId, long bornTimestamp, SocketAddress bornHost, long storeTimestamp, - SocketAddress storeHost, String msgId) { - this.queueId = queueId; - this.bornTimestamp = bornTimestamp; - this.bornHost = bornHost; - this.storeTimestamp = storeTimestamp; - this.storeHost = storeHost; - this.msgId = msgId; - } - - public static TopicFilterType parseTopicFilterType(final int sysFlag) { - if ((sysFlag & MessageSysFlag.MULTI_TAGS_FLAG) == MessageSysFlag.MULTI_TAGS_FLAG) { - return TopicFilterType.MULTI_TAG; - } - - return TopicFilterType.SINGLE_TAG; - } - - public ByteBuffer getBornHostBytes() { - return socketAddress2ByteBuffer(this.bornHost); - } - - public ByteBuffer getBornHostBytes(ByteBuffer byteBuffer) { - return socketAddress2ByteBuffer(this.bornHost, byteBuffer); - } - - private static ByteBuffer socketAddress2ByteBuffer(final SocketAddress socketAddress, final ByteBuffer byteBuffer) { - InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress; - byteBuffer.put(inetSocketAddress.getAddress().getAddress(), 0, 4); - byteBuffer.putInt(inetSocketAddress.getPort()); - byteBuffer.flip(); - return byteBuffer; - } - - public static ByteBuffer socketAddress2ByteBuffer(SocketAddress socketAddress) { - ByteBuffer byteBuffer = ByteBuffer.allocate(8); - return socketAddress2ByteBuffer(socketAddress, byteBuffer); - } - - public ByteBuffer getStoreHostBytes() { - return socketAddress2ByteBuffer(this.storeHost); - } - - public ByteBuffer getStoreHostBytes(ByteBuffer byteBuffer) { - return socketAddress2ByteBuffer(this.storeHost, byteBuffer); - } - - public int getQueueId() { - return queueId; - } - - public void setQueueId(int queueId) { - this.queueId = queueId; - } - - public long getBornTimestamp() { - return bornTimestamp; - } - - public void setBornTimestamp(long bornTimestamp) { - this.bornTimestamp = bornTimestamp; - } - - public SocketAddress getBornHost() { - return bornHost; - } - - public void setBornHost(SocketAddress bornHost) { - this.bornHost = bornHost; - } - - public String getBornHostString() { - if (this.bornHost != null) { - InetSocketAddress inetSocketAddress = (InetSocketAddress) this.bornHost; - return inetSocketAddress.getAddress().getHostAddress(); - } - - return null; - } - - public String getBornHostNameString() { - if (this.bornHost != null) { - InetSocketAddress inetSocketAddress = (InetSocketAddress) this.bornHost; - return inetSocketAddress.getAddress().getHostName(); - } - - return null; - } - - public long getStoreTimestamp() { - return storeTimestamp; - } - - public void setStoreTimestamp(long storeTimestamp) { - this.storeTimestamp = storeTimestamp; - } - - public SocketAddress getStoreHost() { - return storeHost; - } - - public void setStoreHost(SocketAddress storeHost) { - this.storeHost = storeHost; - } - - public String getMsgId() { - return msgId; - } - - public void setMsgId(String msgId) { - this.msgId = msgId; - } - - public int getSysFlag() { - return sysFlag; - } - - public void setSysFlag(int sysFlag) { - this.sysFlag = sysFlag; - } - - public int getBodyCRC() { - return bodyCRC; - } - - public void setBodyCRC(int bodyCRC) { - this.bodyCRC = bodyCRC; - } - - public long getQueueOffset() { - return queueOffset; - } - - public void setQueueOffset(long queueOffset) { - this.queueOffset = queueOffset; - } - - public long getCommitLogOffset() { - return commitLogOffset; - } - - public void setCommitLogOffset(long physicOffset) { - this.commitLogOffset = physicOffset; - } - - public int getStoreSize() { - return storeSize; - } - - public void setStoreSize(int storeSize) { - this.storeSize = storeSize; - } - - public int getReconsumeTimes() { - return reconsumeTimes; - } - - - public void setReconsumeTimes(int reconsumeTimes) { - this.reconsumeTimes = reconsumeTimes; - } - - - public long getPreparedTransactionOffset() { - return preparedTransactionOffset; - } - - - public void setPreparedTransactionOffset(long preparedTransactionOffset) { - this.preparedTransactionOffset = preparedTransactionOffset; - } - - - @Override - public String toString() { - return "MessageExt [queueId=" + queueId + ", storeSize=" + storeSize + ", queueOffset=" + queueOffset - + ", sysFlag=" + sysFlag + ", bornTimestamp=" + bornTimestamp + ", bornHost=" + bornHost - + ", storeTimestamp=" + storeTimestamp + ", storeHost=" + storeHost + ", msgId=" + msgId - + ", commitLogOffset=" + commitLogOffset + ", bodyCRC=" + bodyCRC + ", reconsumeTimes=" - + reconsumeTimes + ", preparedTransactionOffset=" + preparedTransactionOffset - + ", toString()=" + super.toString() + "]"; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/message/MessageId.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/alibaba/rocketmq/common/message/MessageId.java b/common/src/main/java/com/alibaba/rocketmq/common/message/MessageId.java deleted file mode 100644 index d08be86..0000000 --- a/common/src/main/java/com/alibaba/rocketmq/common/message/MessageId.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.common.message; - -import java.net.SocketAddress; - - -/** - * @author shijia.wxr - */ -public class MessageId { - private SocketAddress address; - private long offset; - - - public MessageId(SocketAddress address, long offset) { - this.address = address; - this.offset = offset; - } - - - public SocketAddress getAddress() { - return address; - } - - - public void setAddress(SocketAddress address) { - this.address = address; - } - - - public long getOffset() { - return offset; - } - - - public void setOffset(long offset) { - this.offset = offset; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/message/MessageQueue.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/alibaba/rocketmq/common/message/MessageQueue.java b/common/src/main/java/com/alibaba/rocketmq/common/message/MessageQueue.java deleted file mode 100644 index 35d2827..0000000 --- a/common/src/main/java/com/alibaba/rocketmq/common/message/MessageQueue.java +++ /dev/null @@ -1,134 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.common.message; - -import java.io.Serializable; - - -/** - * @author shijia.wxr - */ -public class MessageQueue implements Comparable<MessageQueue>, Serializable { - private static final long serialVersionUID = 6191200464116433425L; - private String topic; - private String brokerName; - private int queueId; - - - public MessageQueue() { - - } - - - public MessageQueue(String topic, String brokerName, int queueId) { - this.topic = topic; - this.brokerName = brokerName; - this.queueId = queueId; - } - - - public String getTopic() { - return topic; - } - - - public void setTopic(String topic) { - this.topic = topic; - } - - - public String getBrokerName() { - return brokerName; - } - - - public void setBrokerName(String brokerName) { - this.brokerName = brokerName; - } - - - public int getQueueId() { - return queueId; - } - - - public void setQueueId(int queueId) { - this.queueId = queueId; - } - - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((brokerName == null) ? 0 : brokerName.hashCode()); - result = prime * result + queueId; - result = prime * result + ((topic == null) ? 0 : topic.hashCode()); - return result; - } - - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - MessageQueue other = (MessageQueue) obj; - if (brokerName == null) { - if (other.brokerName != null) - return false; - } else if (!brokerName.equals(other.brokerName)) - return false; - if (queueId != other.queueId) - return false; - if (topic == null) { - if (other.topic != null) - return false; - } else if (!topic.equals(other.topic)) - return false; - return true; - } - - - @Override - public String toString() { - return "MessageQueue [topic=" + topic + ", brokerName=" + brokerName + ", queueId=" + queueId + "]"; - } - - - @Override - public int compareTo(MessageQueue o) { - { - int result = this.topic.compareTo(o.topic); - if (result != 0) { - return result; - } - } - - { - int result = this.brokerName.compareTo(o.brokerName); - if (result != 0) { - return result; - } - } - - return this.queueId - o.queueId; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/message/MessageQueueForC.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/alibaba/rocketmq/common/message/MessageQueueForC.java b/common/src/main/java/com/alibaba/rocketmq/common/message/MessageQueueForC.java deleted file mode 100644 index a905af6..0000000 --- a/common/src/main/java/com/alibaba/rocketmq/common/message/MessageQueueForC.java +++ /dev/null @@ -1,152 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.common.message; - -import java.io.Serializable; - - -/** - * @author lansheng.zj - */ -public class MessageQueueForC implements Comparable<MessageQueueForC>, Serializable { - - private static final long serialVersionUID = 5320967846569962104L; - private String topic; - private String brokerName; - private int queueId; - private long offset; - - - public MessageQueueForC(String topic, String brokerName, int queueId, long offset) { - this.topic = topic; - this.brokerName = brokerName; - this.queueId = queueId; - this.offset = offset; - } - - - @Override - public int compareTo(MessageQueueForC o) { - int result = this.topic.compareTo(o.topic); - if (result != 0) { - return result; - } - result = this.brokerName.compareTo(o.brokerName); - if (result != 0) { - return result; - } - result = this.queueId - o.queueId; - if (result != 0) { - return result; - } - if ((this.offset - o.offset) > 0) { - return 1; - } else if ((this.offset - o.offset) == 0) { - return 0; - } else { - return -1; - } - } - - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((brokerName == null) ? 0 : brokerName.hashCode()); - result = prime * result + queueId; - result = prime * result + ((topic == null) ? 0 : topic.hashCode()); - return result; - } - - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - MessageQueueForC other = (MessageQueueForC) obj; - if (brokerName == null) { - if (other.brokerName != null) - return false; - } else if (!brokerName.equals(other.brokerName)) - return false; - if (queueId != other.queueId) - return false; - if (topic == null) { - if (other.topic != null) - return false; - } else if (!topic.equals(other.topic)) - return false; - - if (offset != other.offset) { - return false; - } - return true; - } - - - @Override - public String toString() { - return "MessageQueueForC [topic=" + topic + ", brokerName=" + brokerName + ", queueId=" + queueId - + ", offset=" + offset + "]"; - } - - - public String getTopic() { - return topic; - } - - - public void setTopic(String topic) { - this.topic = topic; - } - - - public String getBrokerName() { - return brokerName; - } - - - public void setBrokerName(String brokerName) { - this.brokerName = brokerName; - } - - - public int getQueueId() { - return queueId; - } - - - public void setQueueId(int queueId) { - this.queueId = queueId; - } - - - public long getOffset() { - return offset; - } - - - public void setOffset(long offset) { - this.offset = offset; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/message/MessageType.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/alibaba/rocketmq/common/message/MessageType.java b/common/src/main/java/com/alibaba/rocketmq/common/message/MessageType.java deleted file mode 100644 index 164eb87..0000000 --- a/common/src/main/java/com/alibaba/rocketmq/common/message/MessageType.java +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.common.message; - -public enum MessageType { - Normal_Msg, - Trans_Msg_Half, - Trans_msg_Commit, - Delay_Msg, -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/namesrv/NamesrvConfig.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/alibaba/rocketmq/common/namesrv/NamesrvConfig.java b/common/src/main/java/com/alibaba/rocketmq/common/namesrv/NamesrvConfig.java deleted file mode 100644 index 08db357..0000000 --- a/common/src/main/java/com/alibaba/rocketmq/common/namesrv/NamesrvConfig.java +++ /dev/null @@ -1,100 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * $Id: NamesrvConfig.java 1839 2013-05-16 02:12:02Z shijia.wxr $ - */ -package com.alibaba.rocketmq.common.namesrv; - -import com.alibaba.rocketmq.common.MixAll; -import com.alibaba.rocketmq.common.constant.LoggerName; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; - - -/** - * - * @author shijia.wxr - * @author lansheng.zj - */ -public class NamesrvConfig { - private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); - private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)); - - private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json"; - private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties"; - private String productEnvName = "center"; - private boolean clusterTest = false; - private boolean orderMessageEnable = false; - - public boolean isOrderMessageEnable() { - return orderMessageEnable; - } - - public void setOrderMessageEnable(boolean orderMessageEnable) { - this.orderMessageEnable = orderMessageEnable; - } - - public String getRocketmqHome() { - return rocketmqHome; - } - - - public void setRocketmqHome(String rocketmqHome) { - this.rocketmqHome = rocketmqHome; - } - - - public String getKvConfigPath() { - return kvConfigPath; - } - - - public void setKvConfigPath(String kvConfigPath) { - this.kvConfigPath = kvConfigPath; - } - - - public String getProductEnvName() { - return productEnvName; - } - - - public void setProductEnvName(String productEnvName) { - this.productEnvName = productEnvName; - } - - - public boolean isClusterTest() { - return clusterTest; - } - - - public void setClusterTest(boolean clusterTest) { - this.clusterTest = clusterTest; - } - - public String getConfigStorePath() { - return configStorePath; - } - - public void setConfigStorePath(final String configStorePath) { - this.configStorePath = configStorePath; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/namesrv/NamesrvUtil.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/alibaba/rocketmq/common/namesrv/NamesrvUtil.java b/common/src/main/java/com/alibaba/rocketmq/common/namesrv/NamesrvUtil.java deleted file mode 100644 index fcc32d9..0000000 --- a/common/src/main/java/com/alibaba/rocketmq/common/namesrv/NamesrvUtil.java +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.common.namesrv; - -/** - * @author shijia.wxr - */ -public class NamesrvUtil { - public static final String NAMESPACE_ORDER_TOPIC_CONFIG = "ORDER_TOPIC_CONFIG"; -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/namesrv/RegisterBrokerResult.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/alibaba/rocketmq/common/namesrv/RegisterBrokerResult.java b/common/src/main/java/com/alibaba/rocketmq/common/namesrv/RegisterBrokerResult.java deleted file mode 100644 index 68bf44a..0000000 --- a/common/src/main/java/com/alibaba/rocketmq/common/namesrv/RegisterBrokerResult.java +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.common.namesrv; - -import com.alibaba.rocketmq.common.protocol.body.KVTable; - - -/** - * @author shijia.wxr - */ -public class RegisterBrokerResult { - private String haServerAddr; - private String masterAddr; - private KVTable kvTable; - - - public String getHaServerAddr() { - return haServerAddr; - } - - - public void setHaServerAddr(String haServerAddr) { - this.haServerAddr = haServerAddr; - } - - - public String getMasterAddr() { - return masterAddr; - } - - - public void setMasterAddr(String masterAddr) { - this.masterAddr = masterAddr; - } - - - public KVTable getKvTable() { - return kvTable; - } - - - public void setKvTable(KVTable kvTable) { - this.kvTable = kvTable; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/namesrv/TopAddressing.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/alibaba/rocketmq/common/namesrv/TopAddressing.java b/common/src/main/java/com/alibaba/rocketmq/common/namesrv/TopAddressing.java deleted file mode 100644 index 2e4ad87..0000000 --- a/common/src/main/java/com/alibaba/rocketmq/common/namesrv/TopAddressing.java +++ /dev/null @@ -1,116 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * $Id: TopAddressing.java 1831 2013-05-16 01:39:51Z shijia.wxr $ - */ -package com.alibaba.rocketmq.common.namesrv; - -import com.alibaba.rocketmq.common.MixAll; -import com.alibaba.rocketmq.common.UtilAll; -import com.alibaba.rocketmq.common.constant.LoggerName; -import com.alibaba.rocketmq.common.help.FAQUrl; -import com.alibaba.rocketmq.common.utils.HttpTinyClient; -import com.alibaba.rocketmq.common.utils.HttpTinyClient.HttpResult; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - - -/** - * @author shijia.wxr - * @author manhong.yqd - */ -public class TopAddressing { - private static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); - private String nsAddr; - private String wsAddr; - private String unitName; - - - public TopAddressing(final String wsAddr) { - this(wsAddr, null); - } - - - public TopAddressing(final String wsAddr, final String unitName) { - this.wsAddr = wsAddr; - this.unitName = unitName; - } - - public final String fetchNSAddr() { - return fetchNSAddr(true, 3000); - } - - public final String fetchNSAddr(boolean verbose, long timeoutMills) { - String url = this.wsAddr; - try { - if (!UtilAll.isBlank(this.unitName)) { - url = url + "-" + this.unitName + "?nofix=1"; - } - HttpResult result = HttpTinyClient.httpGet(url, null, null, "UTF-8", timeoutMills); - if (200 == result.code) { - String responseStr = result.content; - if (responseStr != null) { - return clearNewLine(responseStr); - } else { - log.error("fetch nameserver address is null"); - } - } else { - log.error("fetch nameserver address failed. statusCode={}", result.code); - } - } catch (IOException e) { - if (verbose) { - log.error("fetch name server address exception", e); - } - } - - if (verbose) { - String errorMsg = - "connect to " + url + " failed, maybe the domain name " + MixAll.WS_DOMAIN_NAME + " not bind in /etc/hosts"; - errorMsg += FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL); - - log.warn(errorMsg); - } - return null; - } - - private static String clearNewLine(final String str) { - String newString = str.trim(); - int index = newString.indexOf("\r"); - if (index != -1) { - return newString.substring(0, index); - } - - index = newString.indexOf("\n"); - if (index != -1) { - return newString.substring(0, index); - } - - return newString; - } - - public String getNsAddr() { - return nsAddr; - } - - - public void setNsAddr(String nsAddr) { - this.nsAddr = nsAddr; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/protocol/MQProtosHelper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/alibaba/rocketmq/common/protocol/MQProtosHelper.java b/common/src/main/java/com/alibaba/rocketmq/common/protocol/MQProtosHelper.java deleted file mode 100644 index aaaa51d..0000000 --- a/common/src/main/java/com/alibaba/rocketmq/common/protocol/MQProtosHelper.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.common.protocol; - -import com.alibaba.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader; -import com.alibaba.rocketmq.remoting.common.RemotingHelper; -import com.alibaba.rocketmq.remoting.exception.RemotingConnectException; -import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException; -import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException; -import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; - - -/** - * @author shijia.wxr - */ -public class MQProtosHelper { - public static boolean registerBrokerToNameServer(final String nsaddr, final String brokerAddr, - final long timeoutMillis) { - RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader(); - requestHeader.setBrokerAddr(brokerAddr); - - RemotingCommand request = - RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader); - - try { - RemotingCommand response = RemotingHelper.invokeSync(nsaddr, request, timeoutMillis); - if (response != null) { - return ResponseCode.SUCCESS == response.getCode(); - } - } catch (RemotingConnectException e) { - e.printStackTrace(); - } catch (RemotingSendRequestException e) { - e.printStackTrace(); - } catch (RemotingTimeoutException e) { - e.printStackTrace(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - - return false; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/protocol/RequestCode.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/alibaba/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/com/alibaba/rocketmq/common/protocol/RequestCode.java deleted file mode 100644 index a8b8698..0000000 --- a/common/src/main/java/com/alibaba/rocketmq/common/protocol/RequestCode.java +++ /dev/null @@ -1,166 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.common.protocol; - -public class RequestCode { - - public static final int SEND_MESSAGE = 10; - - public static final int PULL_MESSAGE = 11; - - public static final int QUERY_MESSAGE = 12; - public static final int QUERY_BROKER_OFFSET = 13; - public static final int QUERY_CONSUMER_OFFSET = 14; - public static final int UPDATE_CONSUMER_OFFSET = 15; - public static final int UPDATE_AND_CREATE_TOPIC = 17; - public static final int GET_ALL_TOPIC_CONFIG = 21; - public static final int GET_TOPIC_CONFIG_LIST = 22; - - public static final int GET_TOPIC_NAME_LIST = 23; - - public static final int UPDATE_BROKER_CONFIG = 25; - - public static final int GET_BROKER_CONFIG = 26; - - public static final int TRIGGER_DELETE_FILES = 27; - - public static final int GET_BROKER_RUNTIME_INFO = 28; - public static final int SEARCH_OFFSET_BY_TIMESTAMP = 29; - public static final int GET_MAX_OFFSET = 30; - public static final int GET_MIN_OFFSET = 31; - - public static final int GET_EARLIEST_MSG_STORETIME = 32; - - public static final int VIEW_MESSAGE_BY_ID = 33; - - public static final int HEART_BEAT = 34; - - public static final int UNREGISTER_CLIENT = 35; - - public static final int CONSUMER_SEND_MSG_BACK = 36; - - public static final int END_TRANSACTION = 37; - public static final int GET_CONSUMER_LIST_BY_GROUP = 38; - - public static final int CHECK_TRANSACTION_STATE = 39; - - public static final int NOTIFY_CONSUMER_IDS_CHANGED = 40; - - public static final int LOCK_BATCH_MQ = 41; - - public static final int UNLOCK_BATCH_MQ = 42; - public static final int GET_ALL_CONSUMER_OFFSET = 43; - - public static final int GET_ALL_DELAY_OFFSET = 45; - - public static final int PUT_KV_CONFIG = 100; - - public static final int GET_KV_CONFIG = 101; - - public static final int DELETE_KV_CONFIG = 102; - - public static final int REGISTER_BROKER = 103; - - public static final int UNREGISTER_BROKER = 104; - public static final int GET_ROUTEINTO_BY_TOPIC = 105; - - public static final int GET_BROKER_CLUSTER_INFO = 106; - public static final int UPDATE_AND_CREATE_SUBSCRIPTIONGROUP = 200; - public static final int GET_ALL_SUBSCRIPTIONGROUP_CONFIG = 201; - public static final int GET_TOPIC_STATS_INFO = 202; - public static final int GET_CONSUMER_CONNECTION_LIST = 203; - public static final int GET_PRODUCER_CONNECTION_LIST = 204; - public static final int WIPE_WRITE_PERM_OF_BROKER = 205; - - - public static final int GET_ALL_TOPIC_LIST_FROM_NAMESERVER = 206; - - public static final int DELETE_SUBSCRIPTIONGROUP = 207; - public static final int GET_CONSUME_STATS = 208; - - public static final int SUSPEND_CONSUMER = 209; - - public static final int RESUME_CONSUMER = 210; - public static final int RESET_CONSUMER_OFFSET_IN_CONSUMER = 211; - public static final int RESET_CONSUMER_OFFSET_IN_BROKER = 212; - - public static final int ADJUST_CONSUMER_THREAD_POOL = 213; - - public static final int WHO_CONSUME_THE_MESSAGE = 214; - - - public static final int DELETE_TOPIC_IN_BROKER = 215; - - public static final int DELETE_TOPIC_IN_NAMESRV = 216; - public static final int GET_KVLIST_BY_NAMESPACE = 219; - - - public static final int RESET_CONSUMER_CLIENT_OFFSET = 220; - - public static final int GET_CONSUMER_STATUS_FROM_CLIENT = 221; - - public static final int INVOKE_BROKER_TO_RESET_OFFSET = 222; - - public static final int INVOKE_BROKER_TO_GET_CONSUMER_STATUS = 223; - - - public static final int QUERY_TOPIC_CONSUME_BY_WHO = 300; - - public static final int GET_TOPICS_BY_CLUSTER = 224; - - public static final int REGISTER_FILTER_SERVER = 301; - public static final int REGISTER_MESSAGE_FILTER_CLASS = 302; - - public static final int QUERY_CONSUME_TIME_SPAN = 303; - - public static final int GET_SYSTEM_TOPIC_LIST_FROM_NS = 304; - public static final int GET_SYSTEM_TOPIC_LIST_FROM_BROKER = 305; - - public static final int CLEAN_EXPIRED_CONSUMEQUEUE = 306; - - public static final int GET_CONSUMER_RUNNING_INFO = 307; - - public static final int QUERY_CORRECTION_OFFSET = 308; - public static final int CONSUME_MESSAGE_DIRECTLY = 309; - - public static final int SEND_MESSAGE_V2 = 310; - - public static final int GET_UNIT_TOPIC_LIST = 311; - - public static final int GET_HAS_UNIT_SUB_TOPIC_LIST = 312; - - public static final int GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST = 313; - - public static final int CLONE_GROUP_OFFSET = 314; - - public static final int VIEW_BROKER_STATS_DATA = 315; - - public static final int CLEAN_UNUSED_TOPIC = 316; - - public static final int GET_BROKER_CONSUME_STATS = 317; - - /** - * update the config of name server - */ - public static final int UPDATE_NAMESRV_CONFIG = 318; - - /** - * get config from name server - */ - public static final int GET_NAMESRV_CONFIG = 319; -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/protocol/ResponseCode.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/alibaba/rocketmq/common/protocol/ResponseCode.java b/common/src/main/java/com/alibaba/rocketmq/common/protocol/ResponseCode.java deleted file mode 100644 index 3c01fad..0000000 --- a/common/src/main/java/com/alibaba/rocketmq/common/protocol/ResponseCode.java +++ /dev/null @@ -1,77 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.common.protocol; - -import com.alibaba.rocketmq.remoting.protocol.RemotingSysResponseCode; - - -public class ResponseCode extends RemotingSysResponseCode { - - public static final int FLUSH_DISK_TIMEOUT = 10; - - public static final int SLAVE_NOT_AVAILABLE = 11; - - public static final int FLUSH_SLAVE_TIMEOUT = 12; - - public static final int MESSAGE_ILLEGAL = 13; - - public static final int SERVICE_NOT_AVAILABLE = 14; - - public static final int VERSION_NOT_SUPPORTED = 15; - - public static final int NO_PERMISSION = 16; - - public static final int TOPIC_NOT_EXIST = 17; - public static final int TOPIC_EXIST_ALREADY = 18; - public static final int PULL_NOT_FOUND = 19; - - public static final int PULL_RETRY_IMMEDIATELY = 20; - - public static final int PULL_OFFSET_MOVED = 21; - - public static final int QUERY_NOT_FOUND = 22; - - public static final int SUBSCRIPTION_PARSE_FAILED = 23; - - public static final int SUBSCRIPTION_NOT_EXIST = 24; - - public static final int SUBSCRIPTION_NOT_LATEST = 25; - - public static final int SUBSCRIPTION_GROUP_NOT_EXIST = 26; - - public static final int TRANSACTION_SHOULD_COMMIT = 200; - - public static final int TRANSACTION_SHOULD_ROLLBACK = 201; - - public static final int TRANSACTION_STATE_UNKNOW = 202; - - public static final int TRANSACTION_STATE_GROUP_WRONG = 203; - public static final int NO_BUYER_ID = 204; - - - public static final int NOT_IN_CURRENT_UNIT = 205; - - - public static final int CONSUMER_NOT_ONLINE = 206; - - - public static final int CONSUME_MSG_TIMEOUT = 207; - - - public static final int NO_MESSAGE = 208; -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/BrokerStatsData.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/BrokerStatsData.java b/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/BrokerStatsData.java deleted file mode 100644 index 6f51b06..0000000 --- a/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/BrokerStatsData.java +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.common.protocol.body; - -import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable; - - -public class BrokerStatsData extends RemotingSerializable { - - private BrokerStatsItem statsMinute; - - private BrokerStatsItem statsHour; - - private BrokerStatsItem statsDay; - - - public BrokerStatsItem getStatsMinute() { - return statsMinute; - } - - - public void setStatsMinute(BrokerStatsItem statsMinute) { - this.statsMinute = statsMinute; - } - - - public BrokerStatsItem getStatsHour() { - return statsHour; - } - - - public void setStatsHour(BrokerStatsItem statsHour) { - this.statsHour = statsHour; - } - - - public BrokerStatsItem getStatsDay() { - return statsDay; - } - - - public void setStatsDay(BrokerStatsItem statsDay) { - this.statsDay = statsDay; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/BrokerStatsItem.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/BrokerStatsItem.java b/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/BrokerStatsItem.java deleted file mode 100644 index 1cf6c3d..0000000 --- a/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/BrokerStatsItem.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.common.protocol.body; - -public class BrokerStatsItem { - private long sum; - private double tps; - private double avgpt; - - - public long getSum() { - return sum; - } - - - public void setSum(long sum) { - this.sum = sum; - } - - - public double getTps() { - return tps; - } - - - public void setTps(double tps) { - this.tps = tps; - } - - - public double getAvgpt() { - return avgpt; - } - - - public void setAvgpt(double avgpt) { - this.avgpt = avgpt; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/CMResult.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/CMResult.java b/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/CMResult.java deleted file mode 100644 index 873b548..0000000 --- a/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/CMResult.java +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.common.protocol.body; - -public enum CMResult { - CR_SUCCESS, - CR_LATER, - CR_ROLLBACK, - CR_COMMIT, - CR_THROW_EXCEPTION, - CR_RETURN_NULL, -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ClusterInfo.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ClusterInfo.java b/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ClusterInfo.java deleted file mode 100644 index 81d6447..0000000 --- a/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ClusterInfo.java +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.common.protocol.body; - -import com.alibaba.rocketmq.common.protocol.route.BrokerData; -import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Set; - - -/** - * @author shijia.wxr - */ -public class ClusterInfo extends RemotingSerializable { - private HashMap<String/* brokerName */, BrokerData> brokerAddrTable; - private HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable; - - - public HashMap<String, BrokerData> getBrokerAddrTable() { - return brokerAddrTable; - } - - - public void setBrokerAddrTable(HashMap<String, BrokerData> brokerAddrTable) { - this.brokerAddrTable = brokerAddrTable; - } - - - public HashMap<String, Set<String>> getClusterAddrTable() { - return clusterAddrTable; - } - - - public void setClusterAddrTable(HashMap<String, Set<String>> clusterAddrTable) { - this.clusterAddrTable = clusterAddrTable; - } - - - public String[] retrieveAllAddrByCluster(String cluster) { - List<String> addrs = new ArrayList<String>(); - if (clusterAddrTable.containsKey(cluster)) { - Set<String> brokerNames = clusterAddrTable.get(cluster); - for (String brokerName : brokerNames) { - BrokerData brokerData = brokerAddrTable.get(brokerName); - if (null != brokerData) { - addrs.addAll(brokerData.getBrokerAddrs().values()); - } - } - } - - return addrs.toArray(new String[]{}); - } - - - public String[] retrieveAllClusterNames() { - return clusterAddrTable.keySet().toArray(new String[]{}); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/Connection.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/Connection.java b/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/Connection.java deleted file mode 100644 index 72cf601..0000000 --- a/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/Connection.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.common.protocol.body; - -import com.alibaba.rocketmq.remoting.protocol.LanguageCode; - - -/** - * @author shijia.wxr - */ -public class Connection { - private String clientId; - private String clientAddr; - private LanguageCode language; - private int version; - - - public String getClientId() { - return clientId; - } - - - public void setClientId(String clientId) { - this.clientId = clientId; - } - - - public String getClientAddr() { - return clientAddr; - } - - - public void setClientAddr(String clientAddr) { - this.clientAddr = clientAddr; - } - - - public LanguageCode getLanguage() { - return language; - } - - - public void setLanguage(LanguageCode language) { - this.language = language; - } - - - public int getVersion() { - return version; - } - - - public void setVersion(int version) { - this.version = version; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeByWho.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeByWho.java b/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeByWho.java deleted file mode 100644 index 8a69352..0000000 --- a/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeByWho.java +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.common.protocol.body; - -import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable; - -import java.util.HashSet; - - -/** - * @author shijia.wxr - * - */ -public class ConsumeByWho extends RemotingSerializable { - private HashSet<String> consumedGroup = new HashSet<String>(); - private HashSet<String> notConsumedGroup = new HashSet<String>(); - private String topic; - private int queueId; - private long offset; - - - public HashSet<String> getConsumedGroup() { - return consumedGroup; - } - - - public void setConsumedGroup(HashSet<String> consumedGroup) { - this.consumedGroup = consumedGroup; - } - - - public HashSet<String> getNotConsumedGroup() { - return notConsumedGroup; - } - - - public void setNotConsumedGroup(HashSet<String> notConsumedGroup) { - this.notConsumedGroup = notConsumedGroup; - } - - - public String getTopic() { - return topic; - } - - - public void setTopic(String topic) { - this.topic = topic; - } - - - public int getQueueId() { - return queueId; - } - - - public void setQueueId(int queueId) { - this.queueId = queueId; - } - - - public long getOffset() { - return offset; - } - - - public void setOffset(long offset) { - this.offset = offset; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeMessageDirectlyResult.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeMessageDirectlyResult.java b/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeMessageDirectlyResult.java deleted file mode 100644 index c895fe2..0000000 --- a/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeMessageDirectlyResult.java +++ /dev/null @@ -1,87 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.common.protocol.body; - -import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable; - - -public class ConsumeMessageDirectlyResult extends RemotingSerializable { - private boolean order = false; - private boolean autoCommit = true; - private CMResult consumeResult; - private String remark; - private long spentTimeMills; - - - public boolean isOrder() { - return order; - } - - - public void setOrder(boolean order) { - this.order = order; - } - - - public boolean isAutoCommit() { - return autoCommit; - } - - - public void setAutoCommit(boolean autoCommit) { - this.autoCommit = autoCommit; - } - - - public String getRemark() { - return remark; - } - - - public void setRemark(String remark) { - this.remark = remark; - } - - - public CMResult getConsumeResult() { - return consumeResult; - } - - - public void setConsumeResult(CMResult consumeResult) { - this.consumeResult = consumeResult; - } - - - public long getSpentTimeMills() { - return spentTimeMills; - } - - - public void setSpentTimeMills(long spentTimeMills) { - this.spentTimeMills = spentTimeMills; - } - - - @Override - public String toString() { - return "ConsumeMessageDirectlyResult [order=" + order + ", autoCommit=" + autoCommit - + ", consumeResult=" + consumeResult + ", remark=" + remark + ", spentTimeMills=" - + spentTimeMills + "]"; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeStatsList.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeStatsList.java b/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeStatsList.java deleted file mode 100644 index a1c608d..0000000 --- a/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeStatsList.java +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.common.protocol.body; - -import com.alibaba.rocketmq.common.admin.ConsumeStats; -import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - - -/** - * @author shijia.wxr - */ -public class ConsumeStatsList extends RemotingSerializable { - private List<Map<String/*subscriptionGroupName*/, List<ConsumeStats>>> consumeStatsList = new ArrayList<Map<String/*subscriptionGroupName*/, List<ConsumeStats>>>(); - private String brokerAddr; - private long totalDiff; - - public List<Map<String, List<ConsumeStats>>> getConsumeStatsList() { - return consumeStatsList; - } - - public void setConsumeStatsList(List<Map<String, List<ConsumeStats>>> consumeStatsList) { - this.consumeStatsList = consumeStatsList; - } - - public String getBrokerAddr() { - return brokerAddr; - } - - public void setBrokerAddr(String brokerAddr) { - this.brokerAddr = brokerAddr; - } - - public long getTotalDiff() { - return totalDiff; - } - - public void setTotalDiff(long totalDiff) { - this.totalDiff = totalDiff; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeStatus.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeStatus.java b/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeStatus.java deleted file mode 100644 index dcb6281..0000000 --- a/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeStatus.java +++ /dev/null @@ -1,88 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.common.protocol.body; - -public class ConsumeStatus { - private double pullRT; - private double pullTPS; - private double consumeRT; - private double consumeOKTPS; - private double consumeFailedTPS; - - private long consumeFailedMsgs; - - - public double getPullRT() { - return pullRT; - } - - - public void setPullRT(double pullRT) { - this.pullRT = pullRT; - } - - - public double getPullTPS() { - return pullTPS; - } - - - public void setPullTPS(double pullTPS) { - this.pullTPS = pullTPS; - } - - - public double getConsumeRT() { - return consumeRT; - } - - - public void setConsumeRT(double consumeRT) { - this.consumeRT = consumeRT; - } - - - public double getConsumeOKTPS() { - return consumeOKTPS; - } - - - public void setConsumeOKTPS(double consumeOKTPS) { - this.consumeOKTPS = consumeOKTPS; - } - - - public double getConsumeFailedTPS() { - return consumeFailedTPS; - } - - - public void setConsumeFailedTPS(double consumeFailedTPS) { - this.consumeFailedTPS = consumeFailedTPS; - } - - - public long getConsumeFailedMsgs() { - return consumeFailedMsgs; - } - - - public void setConsumeFailedMsgs(long consumeFailedMsgs) { - this.consumeFailedMsgs = consumeFailedMsgs; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumerConnection.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumerConnection.java b/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumerConnection.java deleted file mode 100644 index f74c6fc..0000000 --- a/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumerConnection.java +++ /dev/null @@ -1,102 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.common.protocol.body; - -import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; -import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType; -import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; -import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData; -import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable; - -import java.util.HashSet; -import java.util.concurrent.ConcurrentHashMap; - - -/** - * @author shijia.wxr - */ -public class ConsumerConnection extends RemotingSerializable { - private HashSet<Connection> connectionSet = new HashSet<Connection>(); - private ConcurrentHashMap<String/* Topic */, SubscriptionData> subscriptionTable = - new ConcurrentHashMap<String, SubscriptionData>(); - private ConsumeType consumeType; - private MessageModel messageModel; - private ConsumeFromWhere consumeFromWhere; - - - public int computeMinVersion() { - int minVersion = Integer.MAX_VALUE; - for (Connection c : this.connectionSet) { - if (c.getVersion() < minVersion) { - minVersion = c.getVersion(); - } - } - - return minVersion; - } - - - public HashSet<Connection> getConnectionSet() { - return connectionSet; - } - - - public void setConnectionSet(HashSet<Connection> connectionSet) { - this.connectionSet = connectionSet; - } - - - public ConcurrentHashMap<String, SubscriptionData> getSubscriptionTable() { - return subscriptionTable; - } - - - public void setSubscriptionTable(ConcurrentHashMap<String, SubscriptionData> subscriptionTable) { - this.subscriptionTable = subscriptionTable; - } - - - public ConsumeType getConsumeType() { - return consumeType; - } - - - public void setConsumeType(ConsumeType consumeType) { - this.consumeType = consumeType; - } - - - public MessageModel getMessageModel() { - return messageModel; - } - - - public void setMessageModel(MessageModel messageModel) { - this.messageModel = messageModel; - } - - - public ConsumeFromWhere getConsumeFromWhere() { - return consumeFromWhere; - } - - - public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) { - this.consumeFromWhere = consumeFromWhere; - } -}
