http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumerConnection.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumerConnection.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumerConnection.java new file mode 100644 index 0000000..f74c6fc --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumerConnection.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.protocol.body; + +import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; +import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType; +import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; +import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData; +import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable; + +import java.util.HashSet; +import java.util.concurrent.ConcurrentHashMap; + + +/** + * @author shijia.wxr + */ +public class ConsumerConnection extends RemotingSerializable { + private HashSet<Connection> connectionSet = new HashSet<Connection>(); + private ConcurrentHashMap<String/* Topic */, SubscriptionData> subscriptionTable = + new ConcurrentHashMap<String, SubscriptionData>(); + private ConsumeType consumeType; + private MessageModel messageModel; + private ConsumeFromWhere consumeFromWhere; + + + public int computeMinVersion() { + int minVersion = Integer.MAX_VALUE; + for (Connection c : this.connectionSet) { + if (c.getVersion() < minVersion) { + minVersion = c.getVersion(); + } + } + + return minVersion; + } + + + public HashSet<Connection> getConnectionSet() { + return connectionSet; + } + + + public void setConnectionSet(HashSet<Connection> connectionSet) { + this.connectionSet = connectionSet; + } + + + public ConcurrentHashMap<String, SubscriptionData> getSubscriptionTable() { + return subscriptionTable; + } + + + public void setSubscriptionTable(ConcurrentHashMap<String, SubscriptionData> subscriptionTable) { + this.subscriptionTable = subscriptionTable; + } + + + public ConsumeType getConsumeType() { + return consumeType; + } + + + public void setConsumeType(ConsumeType consumeType) { + this.consumeType = consumeType; + } + + + public MessageModel getMessageModel() { + return messageModel; + } + + + public void setMessageModel(MessageModel messageModel) { + this.messageModel = messageModel; + } + + + public ConsumeFromWhere getConsumeFromWhere() { + return consumeFromWhere; + } + + + public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) { + this.consumeFromWhere = consumeFromWhere; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumerOffsetSerializeWrapper.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumerOffsetSerializeWrapper.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumerOffsetSerializeWrapper.java new file mode 100644 index 0000000..7af4e7a --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumerOffsetSerializeWrapper.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.protocol.body; + +import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable; + +import java.util.concurrent.ConcurrentHashMap; + + +/** + * @author manhong.yqd + */ +public class ConsumerOffsetSerializeWrapper extends RemotingSerializable { + private ConcurrentHashMap<String/* topic@group */, ConcurrentHashMap<Integer, Long>> offsetTable = + new ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>>(512); + + + public ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> getOffsetTable() { + return offsetTable; + } + + + public void setOffsetTable(ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> offsetTable) { + this.offsetTable = offsetTable; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumerRunningInfo.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumerRunningInfo.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumerRunningInfo.java new file mode 100644 index 0000000..56babc2 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumerRunningInfo.java @@ -0,0 +1,315 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.protocol.body; + +import com.alibaba.rocketmq.common.message.MessageQueue; +import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType; +import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData; +import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable; + +import java.util.*; +import java.util.Map.Entry; + +public class ConsumerRunningInfo extends RemotingSerializable { + public static final String PROP_NAMESERVER_ADDR = "PROP_NAMESERVER_ADDR"; + public static final String PROP_THREADPOOL_CORE_SIZE = "PROP_THREADPOOL_CORE_SIZE"; + public static final String PROP_CONSUME_ORDERLY = "PROP_CONSUMEORDERLY"; + public static final String PROP_CONSUME_TYPE = "PROP_CONSUME_TYPE"; + public static final String PROP_CLIENT_VERSION = "PROP_CLIENT_VERSION"; + public static final String PROP_CONSUMER_START_TIMESTAMP = "PROP_CONSUMER_START_TIMESTAMP"; + + + private Properties properties = new Properties(); + + private TreeSet<SubscriptionData> subscriptionSet = new TreeSet<SubscriptionData>(); + + private TreeMap<MessageQueue, ProcessQueueInfo> mqTable = new TreeMap<MessageQueue, ProcessQueueInfo>(); + + private TreeMap<String/* Topic */, ConsumeStatus> statusTable = new TreeMap<String, ConsumeStatus>(); + + private String jstack; + + public static boolean analyzeSubscription(final TreeMap<String/* clientId */, ConsumerRunningInfo> criTable) { + ConsumerRunningInfo prev = criTable.firstEntry().getValue(); + + boolean push = false; + { + String property = prev.getProperties().getProperty(ConsumerRunningInfo.PROP_CONSUME_TYPE); + + if (property == null) { + property = ((ConsumeType) prev.getProperties().get(ConsumerRunningInfo.PROP_CONSUME_TYPE)).name(); + } + push = ConsumeType.valueOf(property) == ConsumeType.CONSUME_PASSIVELY; + } + + boolean startForAWhile = false; + { + + String property = prev.getProperties().getProperty(ConsumerRunningInfo.PROP_CONSUMER_START_TIMESTAMP); + if (property == null) { + property = String.valueOf(prev.getProperties().get(ConsumerRunningInfo.PROP_CONSUMER_START_TIMESTAMP)); + } + startForAWhile = (System.currentTimeMillis() - Long.parseLong(property)) > (1000 * 60 * 2); + } + + if (push && startForAWhile) { + + { + Iterator<Entry<String, ConsumerRunningInfo>> it = criTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<String, ConsumerRunningInfo> next = it.next(); + ConsumerRunningInfo current = next.getValue(); + boolean equals = current.getSubscriptionSet().equals(prev.getSubscriptionSet()); + + if (!equals) { + // Different subscription in the same group of consumer + return false; + } + + prev = next.getValue(); + } + + if (prev != null) { + + if (prev.getSubscriptionSet().isEmpty()) { + // Subscription empty! + return false; + } + } + } + } + + return true; + } + + public Properties getProperties() { + return properties; + } + + public void setProperties(Properties properties) { + this.properties = properties; + } + + public TreeSet<SubscriptionData> getSubscriptionSet() { + return subscriptionSet; + } + + public void setSubscriptionSet(TreeSet<SubscriptionData> subscriptionSet) { + this.subscriptionSet = subscriptionSet; + } + + public static boolean analyzeRebalance(final TreeMap<String/* clientId */, ConsumerRunningInfo> criTable) { + return true; + } + + public static String analyzeProcessQueue(final String clientId, ConsumerRunningInfo info) { + StringBuilder sb = new StringBuilder(); + boolean push = false; + { + String property = info.getProperties().getProperty(ConsumerRunningInfo.PROP_CONSUME_TYPE); + + if (property == null) { + property = ((ConsumeType) info.getProperties().get(ConsumerRunningInfo.PROP_CONSUME_TYPE)).name(); + } + push = ConsumeType.valueOf(property) == ConsumeType.CONSUME_PASSIVELY; + } + + boolean orderMsg = false; + { + String property = info.getProperties().getProperty(ConsumerRunningInfo.PROP_CONSUME_ORDERLY); + orderMsg = Boolean.parseBoolean(property); + } + + if (push) { + Iterator<Entry<MessageQueue, ProcessQueueInfo>> it = info.getMqTable().entrySet().iterator(); + while (it.hasNext()) { + Entry<MessageQueue, ProcessQueueInfo> next = it.next(); + MessageQueue mq = next.getKey(); + ProcessQueueInfo pq = next.getValue(); + + + if (orderMsg) { + + if (!pq.isLocked()) { + sb.append(String.format("%s %s can't lock for a while, %dms%n", // + clientId, // + mq, // + System.currentTimeMillis() - pq.getLastLockTimestamp())); + } else { + if (pq.isDroped() && (pq.getTryUnlockTimes() > 0)) { + sb.append(String.format("%s %s unlock %d times, still failed%n", // + clientId, // + mq, // + pq.getTryUnlockTimes())); + } + } + + + } else { + long diff = System.currentTimeMillis() - pq.getLastConsumeTimestamp(); + + if (diff > (1000 * 60) && pq.getCachedMsgCount() > 0) { + sb.append(String.format("%s %s can't consume for a while, maybe blocked, %dms%n", // + clientId, // + mq, // + diff)); + } + } + } + } + + return sb.toString(); + } + + public TreeMap<MessageQueue, ProcessQueueInfo> getMqTable() { + return mqTable; + } + + public void setMqTable(TreeMap<MessageQueue, ProcessQueueInfo> mqTable) { + this.mqTable = mqTable; + } + + public TreeMap<String, ConsumeStatus> getStatusTable() { + return statusTable; + } + + public void setStatusTable(TreeMap<String, ConsumeStatus> statusTable) { + this.statusTable = statusTable; + } + + public String formatString() { + StringBuilder sb = new StringBuilder(); + + { + sb.append("#Consumer Properties#\n"); + Iterator<Entry<Object, Object>> it = this.properties.entrySet().iterator(); + while (it.hasNext()) { + Entry<Object, Object> next = it.next(); + String item = String.format("%-40s: %s%n", next.getKey().toString(), next.getValue().toString()); + sb.append(item); + } + } + + { + sb.append("\n\n#Consumer Subscription#\n"); + + Iterator<SubscriptionData> it = this.subscriptionSet.iterator(); + int i = 0; + while (it.hasNext()) { + SubscriptionData next = it.next(); + String item = String.format("%03d Topic: %-40s ClassFilter: %-8s SubExpression: %s%n", // + ++i, // + next.getTopic(), // + next.isClassFilterMode(), // + next.getSubString()); + + sb.append(item); + } + } + + { + sb.append("\n\n#Consumer Offset#\n"); + sb.append(String.format("%-32s %-32s %-4s %-20s%n", // + "#Topic", // + "#Broker Name", // + "#QID", // + "#Consumer Offset"// + )); + + Iterator<Entry<MessageQueue, ProcessQueueInfo>> it = this.mqTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<MessageQueue, ProcessQueueInfo> next = it.next(); + String item = String.format("%-32s %-32s %-4d %-20d%n", // + next.getKey().getTopic(), // + next.getKey().getBrokerName(), // + next.getKey().getQueueId(), // + next.getValue().getCommitOffset()); + + sb.append(item); + } + } + + { + sb.append("\n\n#Consumer MQ Detail#\n"); + sb.append(String.format("%-32s %-32s %-4s %-20s%n", // + "#Topic", // + "#Broker Name", // + "#QID", // + "#ProcessQueueInfo"// + )); + + Iterator<Entry<MessageQueue, ProcessQueueInfo>> it = this.mqTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<MessageQueue, ProcessQueueInfo> next = it.next(); + String item = String.format("%-32s %-32s %-4d %s%n", // + next.getKey().getTopic(), // + next.getKey().getBrokerName(), // + next.getKey().getQueueId(), // + next.getValue().toString()); + + sb.append(item); + } + } + + { + sb.append("\n\n#Consumer RT&TPS#\n"); + sb.append(String.format("%-32s %14s %14s %14s %14s %18s %25s%n", // + "#Topic", // + "#Pull RT", // + "#Pull TPS", // + "#Consume RT", // + "#ConsumeOK TPS", // + "#ConsumeFailed TPS", // + "#ConsumeFailedMsgsInHour"// + )); + + Iterator<Entry<String, ConsumeStatus>> it = this.statusTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<String, ConsumeStatus> next = it.next(); + String item = String.format("%-32s %14.2f %14.2f %14.2f %14.2f %18.2f %25d%n", // + next.getKey(), // + next.getValue().getPullRT(), // + next.getValue().getPullTPS(), // + next.getValue().getConsumeRT(), // + next.getValue().getConsumeOKTPS(), // + next.getValue().getConsumeFailedTPS(), // + next.getValue().getConsumeFailedMsgs()// + ); + + sb.append(item); + } + } + + if (this.jstack != null) { + sb.append("\n\n#Consumer jstack#\n"); + sb.append(this.jstack); + } + + return sb.toString(); + } + + public String getJstack() { + return jstack; + } + + + public void setJstack(String jstack) { + this.jstack = jstack; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/GetConsumerStatusBody.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/GetConsumerStatusBody.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/GetConsumerStatusBody.java new file mode 100644 index 0000000..ca84f21 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/GetConsumerStatusBody.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.protocol.body; + +import com.alibaba.rocketmq.common.message.MessageQueue; +import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable; + +import java.util.HashMap; +import java.util.Map; + + +/** + * @author manhong.yqd + */ +@Deprecated +public class GetConsumerStatusBody extends RemotingSerializable { + private Map<MessageQueue, Long> messageQueueTable = new HashMap<MessageQueue, Long>(); + private Map<String, Map<MessageQueue, Long>> consumerTable = + new HashMap<String, Map<MessageQueue, Long>>(); + + + public Map<MessageQueue, Long> getMessageQueueTable() { + return messageQueueTable; + } + + + public void setMessageQueueTable(Map<MessageQueue, Long> messageQueueTable) { + this.messageQueueTable = messageQueueTable; + } + + + public Map<String, Map<MessageQueue, Long>> getConsumerTable() { + return consumerTable; + } + + + public void setConsumerTable(Map<String, Map<MessageQueue, Long>> consumerTable) { + this.consumerTable = consumerTable; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/GroupList.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/GroupList.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/GroupList.java new file mode 100644 index 0000000..9f7e500 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/GroupList.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.common.protocol.body; + +import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable; + +import java.util.HashSet; + + +/** + * @author shijia.wxr + * + */ +public class GroupList extends RemotingSerializable { + private HashSet<String> groupList = new HashSet<String>(); + + + public HashSet<String> getGroupList() { + return groupList; + } + + + public void setGroupList(HashSet<String> groupList) { + this.groupList = groupList; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/KVTable.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/KVTable.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/KVTable.java new file mode 100644 index 0000000..41cfcb8 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/KVTable.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.common.protocol.body; + +import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable; + +import java.util.HashMap; + + +/** + * @author shijia.wxr + * + */ +public class KVTable extends RemotingSerializable { + private HashMap<String, String> table = new HashMap<String, String>(); + + + public HashMap<String, String> getTable() { + return table; + } + + + public void setTable(HashMap<String, String> table) { + this.table = table; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/LockBatchRequestBody.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/LockBatchRequestBody.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/LockBatchRequestBody.java new file mode 100644 index 0000000..992f656 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/LockBatchRequestBody.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.protocol.body; + +import com.alibaba.rocketmq.common.message.MessageQueue; +import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable; + +import java.util.HashSet; +import java.util.Set; + + +/** + * @author shijia.wxr + * + */ +public class LockBatchRequestBody extends RemotingSerializable { + private String consumerGroup; + private String clientId; + private Set<MessageQueue> mqSet = new HashSet<MessageQueue>(); + + + public String getConsumerGroup() { + return consumerGroup; + } + + + public void setConsumerGroup(String consumerGroup) { + this.consumerGroup = consumerGroup; + } + + + public String getClientId() { + return clientId; + } + + + public void setClientId(String clientId) { + this.clientId = clientId; + } + + + public Set<MessageQueue> getMqSet() { + return mqSet; + } + + + public void setMqSet(Set<MessageQueue> mqSet) { + this.mqSet = mqSet; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/LockBatchResponseBody.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/LockBatchResponseBody.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/LockBatchResponseBody.java new file mode 100644 index 0000000..12f6c76 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/LockBatchResponseBody.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.protocol.body; + +import com.alibaba.rocketmq.common.message.MessageQueue; +import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable; + +import java.util.HashSet; +import java.util.Set; + + +/** + * @author shijia.wxr + * + */ +public class LockBatchResponseBody extends RemotingSerializable { + + private Set<MessageQueue> lockOKMQSet = new HashSet<MessageQueue>(); + + + public Set<MessageQueue> getLockOKMQSet() { + return lockOKMQSet; + } + + + public void setLockOKMQSet(Set<MessageQueue> lockOKMQSet) { + this.lockOKMQSet = lockOKMQSet; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ProcessQueueInfo.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ProcessQueueInfo.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ProcessQueueInfo.java new file mode 100644 index 0000000..6c17443 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ProcessQueueInfo.java @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.protocol.body; + +import com.alibaba.rocketmq.common.UtilAll; + + +public class ProcessQueueInfo { + private long commitOffset; + + private long cachedMsgMinOffset; + private long cachedMsgMaxOffset; + private int cachedMsgCount; + + private long transactionMsgMinOffset; + private long transactionMsgMaxOffset; + private int transactionMsgCount; + + private boolean locked; + private long tryUnlockTimes; + private long lastLockTimestamp; + + private boolean droped; + private long lastPullTimestamp; + private long lastConsumeTimestamp; + + + public long getCommitOffset() { + return commitOffset; + } + + + public void setCommitOffset(long commitOffset) { + this.commitOffset = commitOffset; + } + + + public long getCachedMsgMinOffset() { + return cachedMsgMinOffset; + } + + + public void setCachedMsgMinOffset(long cachedMsgMinOffset) { + this.cachedMsgMinOffset = cachedMsgMinOffset; + } + + + public long getCachedMsgMaxOffset() { + return cachedMsgMaxOffset; + } + + + public void setCachedMsgMaxOffset(long cachedMsgMaxOffset) { + this.cachedMsgMaxOffset = cachedMsgMaxOffset; + } + + + public int getCachedMsgCount() { + return cachedMsgCount; + } + + + public void setCachedMsgCount(int cachedMsgCount) { + this.cachedMsgCount = cachedMsgCount; + } + + + public long getTransactionMsgMinOffset() { + return transactionMsgMinOffset; + } + + + public void setTransactionMsgMinOffset(long transactionMsgMinOffset) { + this.transactionMsgMinOffset = transactionMsgMinOffset; + } + + + public long getTransactionMsgMaxOffset() { + return transactionMsgMaxOffset; + } + + + public void setTransactionMsgMaxOffset(long transactionMsgMaxOffset) { + this.transactionMsgMaxOffset = transactionMsgMaxOffset; + } + + + public int getTransactionMsgCount() { + return transactionMsgCount; + } + + + public void setTransactionMsgCount(int transactionMsgCount) { + this.transactionMsgCount = transactionMsgCount; + } + + + public boolean isLocked() { + return locked; + } + + + public void setLocked(boolean locked) { + this.locked = locked; + } + + + public long getTryUnlockTimes() { + return tryUnlockTimes; + } + + + public void setTryUnlockTimes(long tryUnlockTimes) { + this.tryUnlockTimes = tryUnlockTimes; + } + + + public long getLastLockTimestamp() { + return lastLockTimestamp; + } + + + public void setLastLockTimestamp(long lastLockTimestamp) { + this.lastLockTimestamp = lastLockTimestamp; + } + + + public boolean isDroped() { + return droped; + } + + + public void setDroped(boolean droped) { + this.droped = droped; + } + + + public long getLastPullTimestamp() { + return lastPullTimestamp; + } + + + public void setLastPullTimestamp(long lastPullTimestamp) { + this.lastPullTimestamp = lastPullTimestamp; + } + + + public long getLastConsumeTimestamp() { + return lastConsumeTimestamp; + } + + + public void setLastConsumeTimestamp(long lastConsumeTimestamp) { + this.lastConsumeTimestamp = lastConsumeTimestamp; + } + + + @Override + public String toString() { + return "ProcessQueueInfo [commitOffset=" + commitOffset + ", cachedMsgMinOffset=" + + cachedMsgMinOffset + ", cachedMsgMaxOffset=" + cachedMsgMaxOffset + ", cachedMsgCount=" + + cachedMsgCount + ", transactionMsgMinOffset=" + transactionMsgMinOffset + + ", transactionMsgMaxOffset=" + transactionMsgMaxOffset + ", transactionMsgCount=" + + transactionMsgCount + ", locked=" + locked + ", tryUnlockTimes=" + tryUnlockTimes + + ", lastLockTimestamp=" + UtilAll.timeMillisToHumanString(lastLockTimestamp) + ", droped=" + + droped + ", lastPullTimestamp=" + UtilAll.timeMillisToHumanString(lastPullTimestamp) + + ", lastConsumeTimestamp=" + UtilAll.timeMillisToHumanString(lastConsumeTimestamp) + "]"; + + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ProducerConnection.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ProducerConnection.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ProducerConnection.java new file mode 100644 index 0000000..32fe1d0 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ProducerConnection.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.protocol.body; + +import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable; + +import java.util.HashSet; + + +/** + * @author shijia.wxr + */ +public class ProducerConnection extends RemotingSerializable { + private HashSet<Connection> connectionSet = new HashSet<Connection>(); + + + public HashSet<Connection> getConnectionSet() { + return connectionSet; + } + + + public void setConnectionSet(HashSet<Connection> connectionSet) { + this.connectionSet = connectionSet; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/QueryConsumeTimeSpanBody.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/QueryConsumeTimeSpanBody.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/QueryConsumeTimeSpanBody.java new file mode 100644 index 0000000..2f52666 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/QueryConsumeTimeSpanBody.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.protocol.body; + +import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable; + +import java.util.ArrayList; +import java.util.List; + + +/** + * @author manhong.yqd + */ +public class QueryConsumeTimeSpanBody extends RemotingSerializable { + List<QueueTimeSpan> consumeTimeSpanSet = new ArrayList<QueueTimeSpan>(); + + + public List<QueueTimeSpan> getConsumeTimeSpanSet() { + return consumeTimeSpanSet; + } + + + public void setConsumeTimeSpanSet(List<QueueTimeSpan> consumeTimeSpanSet) { + this.consumeTimeSpanSet = consumeTimeSpanSet; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/QueryCorrectionOffsetBody.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/QueryCorrectionOffsetBody.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/QueryCorrectionOffsetBody.java new file mode 100644 index 0000000..225b90c --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/QueryCorrectionOffsetBody.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.common.protocol.body; + +import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable; + +import java.util.HashMap; +import java.util.Map; + + +/** + * @author manhong.yqd + */ +public class QueryCorrectionOffsetBody extends RemotingSerializable { + private Map<Integer, Long> correctionOffsets = new HashMap<Integer, Long>(); + + + public Map<Integer, Long> getCorrectionOffsets() { + return correctionOffsets; + } + + + public void setCorrectionOffsets(Map<Integer, Long> correctionOffsets) { + this.correctionOffsets = correctionOffsets; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/QueueTimeSpan.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/QueueTimeSpan.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/QueueTimeSpan.java new file mode 100644 index 0000000..14001ec --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/QueueTimeSpan.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.protocol.body; + +import com.alibaba.rocketmq.common.UtilAll; +import com.alibaba.rocketmq.common.message.MessageQueue; + +import java.util.Date; + + +/** + * @author manhong.yqd + */ +public class QueueTimeSpan { + private MessageQueue messageQueue; + private long minTimeStamp; + private long maxTimeStamp; + private long consumeTimeStamp; + private long delayTime; + + + public MessageQueue getMessageQueue() { + return messageQueue; + } + + + public void setMessageQueue(MessageQueue messageQueue) { + this.messageQueue = messageQueue; + } + + + public long getMinTimeStamp() { + return minTimeStamp; + } + + + public void setMinTimeStamp(long minTimeStamp) { + this.minTimeStamp = minTimeStamp; + } + + + public long getMaxTimeStamp() { + return maxTimeStamp; + } + + + public void setMaxTimeStamp(long maxTimeStamp) { + this.maxTimeStamp = maxTimeStamp; + } + + + public long getConsumeTimeStamp() { + return consumeTimeStamp; + } + + + public void setConsumeTimeStamp(long consumeTimeStamp) { + this.consumeTimeStamp = consumeTimeStamp; + } + + + public String getMinTimeStampStr() { + return UtilAll.formatDate(new Date(minTimeStamp), UtilAll.YYYY_MM_DD_HH_MM_SS_SSS); + } + + + public String getMaxTimeStampStr() { + return UtilAll.formatDate(new Date(maxTimeStamp), UtilAll.YYYY_MM_DD_HH_MM_SS_SSS); + } + + + public String getConsumeTimeStampStr() { + return UtilAll.formatDate(new Date(consumeTimeStamp), UtilAll.YYYY_MM_DD_HH_MM_SS_SSS); + } + + + public long getDelayTime() { + return delayTime; + } + + + public void setDelayTime(long delayTime) { + this.delayTime = delayTime; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/RegisterBrokerBody.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/RegisterBrokerBody.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/RegisterBrokerBody.java new file mode 100644 index 0000000..364bbcb --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/RegisterBrokerBody.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.protocol.body; + +import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable; + +import java.util.ArrayList; +import java.util.List; + + +public class RegisterBrokerBody extends RemotingSerializable { + private TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper(); + private List<String> filterServerList = new ArrayList<String>(); + + + public TopicConfigSerializeWrapper getTopicConfigSerializeWrapper() { + return topicConfigSerializeWrapper; + } + + + public void setTopicConfigSerializeWrapper(TopicConfigSerializeWrapper topicConfigSerializeWrapper) { + this.topicConfigSerializeWrapper = topicConfigSerializeWrapper; + } + + + public List<String> getFilterServerList() { + return filterServerList; + } + + + public void setFilterServerList(List<String> filterServerList) { + this.filterServerList = filterServerList; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ResetOffsetBody.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ResetOffsetBody.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ResetOffsetBody.java new file mode 100644 index 0000000..2122e61 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ResetOffsetBody.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.protocol.body; + +import com.alibaba.rocketmq.common.message.MessageQueue; +import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable; + +import java.util.Map; + + +/** + * @author manhong.yqd + * + */ +public class ResetOffsetBody extends RemotingSerializable { + private Map<MessageQueue, Long> offsetTable; + + + public Map<MessageQueue, Long> getOffsetTable() { + return offsetTable; + } + + + public void setOffsetTable(Map<MessageQueue, Long> offsetTable) { + this.offsetTable = offsetTable; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ResetOffsetBodyForC.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ResetOffsetBodyForC.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ResetOffsetBodyForC.java new file mode 100644 index 0000000..fb7360e --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ResetOffsetBodyForC.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.protocol.body; + +import com.alibaba.rocketmq.common.message.MessageQueueForC; +import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable; + +import java.util.List; + +public class ResetOffsetBodyForC extends RemotingSerializable { + + private List<MessageQueueForC> offsetTable; + + + public List<MessageQueueForC> getOffsetTable() { + return offsetTable; + } + + + public void setOffsetTable(List<MessageQueueForC> offsetTable) { + this.offsetTable = offsetTable; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/SubscriptionGroupWrapper.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/SubscriptionGroupWrapper.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/SubscriptionGroupWrapper.java new file mode 100644 index 0000000..096672c --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/SubscriptionGroupWrapper.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.protocol.body; + +import com.alibaba.rocketmq.common.DataVersion; +import com.alibaba.rocketmq.common.subscription.SubscriptionGroupConfig; +import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable; + +import java.util.concurrent.ConcurrentHashMap; + + +/** + * @author manhong.yqd + */ +public class SubscriptionGroupWrapper extends RemotingSerializable { + private ConcurrentHashMap<String, SubscriptionGroupConfig> subscriptionGroupTable = + new ConcurrentHashMap<String, SubscriptionGroupConfig>(1024); + private DataVersion dataVersion = new DataVersion(); + + + public ConcurrentHashMap<String, SubscriptionGroupConfig> getSubscriptionGroupTable() { + return subscriptionGroupTable; + } + + + public void setSubscriptionGroupTable( + ConcurrentHashMap<String, SubscriptionGroupConfig> subscriptionGroupTable) { + this.subscriptionGroupTable = subscriptionGroupTable; + } + + + public DataVersion getDataVersion() { + return dataVersion; + } + + + public void setDataVersion(DataVersion dataVersion) { + this.dataVersion = dataVersion; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java new file mode 100644 index 0000000..0050762 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.protocol.body; + +import com.alibaba.rocketmq.common.DataVersion; +import com.alibaba.rocketmq.common.TopicConfig; +import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable; + +import java.util.concurrent.ConcurrentHashMap; + + +public class TopicConfigSerializeWrapper extends RemotingSerializable { + private ConcurrentHashMap<String, TopicConfig> topicConfigTable = + new ConcurrentHashMap<String, TopicConfig>(); + private DataVersion dataVersion = new DataVersion(); + + + public ConcurrentHashMap<String, TopicConfig> getTopicConfigTable() { + return topicConfigTable; + } + + + public void setTopicConfigTable(ConcurrentHashMap<String, TopicConfig> topicConfigTable) { + this.topicConfigTable = topicConfigTable; + } + + + public DataVersion getDataVersion() { + return dataVersion; + } + + + public void setDataVersion(DataVersion dataVersion) { + this.dataVersion = dataVersion; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/TopicList.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/TopicList.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/TopicList.java new file mode 100644 index 0000000..84912ce --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/TopicList.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.common.protocol.body; + +import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable; + +import java.util.HashSet; +import java.util.Set; + + +/** + * @author shijia.wxr + */ +public class TopicList extends RemotingSerializable { + private Set<String> topicList = new HashSet<String>(); + private String brokerAddr; + + + public Set<String> getTopicList() { + return topicList; + } + + + public void setTopicList(Set<String> topicList) { + this.topicList = topicList; + } + + + public String getBrokerAddr() { + return brokerAddr; + } + + + public void setBrokerAddr(String brokerAddr) { + this.brokerAddr = brokerAddr; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/UnlockBatchRequestBody.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/UnlockBatchRequestBody.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/UnlockBatchRequestBody.java new file mode 100644 index 0000000..542b797 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/UnlockBatchRequestBody.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.protocol.body; + +import com.alibaba.rocketmq.common.message.MessageQueue; +import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable; + +import java.util.HashSet; +import java.util.Set; + + +/** + * @author shijia.wxr + */ +public class UnlockBatchRequestBody extends RemotingSerializable { + private String consumerGroup; + private String clientId; + private Set<MessageQueue> mqSet = new HashSet<MessageQueue>(); + + + public String getConsumerGroup() { + return consumerGroup; + } + + + public void setConsumerGroup(String consumerGroup) { + this.consumerGroup = consumerGroup; + } + + + public String getClientId() { + return clientId; + } + + + public void setClientId(String clientId) { + this.clientId = clientId; + } + + + public Set<MessageQueue> getMqSet() { + return mqSet; + } + + + public void setMqSet(Set<MessageQueue> mqSet) { + this.mqSet = mqSet; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/CheckTransactionStateRequestHeader.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/CheckTransactionStateRequestHeader.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/CheckTransactionStateRequestHeader.java new file mode 100644 index 0000000..37d6a7f --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/CheckTransactionStateRequestHeader.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * $Id: EndTransactionRequestHeader.java 1835 2013-05-16 02:00:50Z shijia.wxr $ + */ +package com.alibaba.rocketmq.common.protocol.header; + +import com.alibaba.rocketmq.remoting.CommandCustomHeader; +import com.alibaba.rocketmq.remoting.annotation.CFNotNull; +import com.alibaba.rocketmq.remoting.exception.RemotingCommandException; + + +/** + * @author shijia.wxr + */ +public class CheckTransactionStateRequestHeader implements CommandCustomHeader { + @CFNotNull + private Long tranStateTableOffset; + @CFNotNull + private Long commitLogOffset; + private String msgId; + private String transactionId; + + + @Override + public void checkFields() throws RemotingCommandException { + } + + + public Long getTranStateTableOffset() { + return tranStateTableOffset; + } + + + public void setTranStateTableOffset(Long tranStateTableOffset) { + this.tranStateTableOffset = tranStateTableOffset; + } + + + public Long getCommitLogOffset() { + return commitLogOffset; + } + + + public void setCommitLogOffset(Long commitLogOffset) { + this.commitLogOffset = commitLogOffset; + } + + public String getMsgId() { + return msgId; + } + + public void setMsgId(String msgId) { + this.msgId = msgId; + } + + public String getTransactionId() { + return transactionId; + } + + public void setTransactionId(String transactionId) { + this.transactionId = transactionId; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/CheckTransactionStateResponseHeader.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/CheckTransactionStateResponseHeader.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/CheckTransactionStateResponseHeader.java new file mode 100644 index 0000000..76c9732 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/CheckTransactionStateResponseHeader.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * $Id: EndTransactionResponseHeader.java 1835 2013-05-16 02:00:50Z shijia.wxr $ + */ +package com.alibaba.rocketmq.common.protocol.header; + +import com.alibaba.rocketmq.common.sysflag.MessageSysFlag; +import com.alibaba.rocketmq.remoting.CommandCustomHeader; +import com.alibaba.rocketmq.remoting.annotation.CFNotNull; +import com.alibaba.rocketmq.remoting.exception.RemotingCommandException; + + +/** + * @author shijia.wxr + */ +public class CheckTransactionStateResponseHeader implements CommandCustomHeader { + @CFNotNull + private String producerGroup; + @CFNotNull + private Long tranStateTableOffset; + @CFNotNull + private Long commitLogOffset; + @CFNotNull + private Integer commitOrRollback; // TRANSACTION_COMMIT_TYPE + + + // TRANSACTION_ROLLBACK_TYPE + + @Override + public void checkFields() throws RemotingCommandException { + if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == this.commitOrRollback) { + return; + } + + if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == this.commitOrRollback) { + return; + } + + throw new RemotingCommandException("commitOrRollback field wrong"); + } + + + public String getProducerGroup() { + return producerGroup; + } + + + public void setProducerGroup(String producerGroup) { + this.producerGroup = producerGroup; + } + + + public Long getTranStateTableOffset() { + return tranStateTableOffset; + } + + + public void setTranStateTableOffset(Long tranStateTableOffset) { + this.tranStateTableOffset = tranStateTableOffset; + } + + + public Long getCommitLogOffset() { + return commitLogOffset; + } + + + public void setCommitLogOffset(Long commitLogOffset) { + this.commitLogOffset = commitLogOffset; + } + + + public Integer getCommitOrRollback() { + return commitOrRollback; + } + + + public void setCommitOrRollback(Integer commitOrRollback) { + this.commitOrRollback = commitOrRollback; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/CloneGroupOffsetRequestHeader.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/CloneGroupOffsetRequestHeader.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/CloneGroupOffsetRequestHeader.java new file mode 100644 index 0000000..6043229 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/CloneGroupOffsetRequestHeader.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * $Id: DeleteTopicRequestHeader.java 1835 2013-05-16 02:00:50Z shijia.wxr $ + */ +package com.alibaba.rocketmq.common.protocol.header; + +import com.alibaba.rocketmq.remoting.CommandCustomHeader; +import com.alibaba.rocketmq.remoting.annotation.CFNotNull; +import com.alibaba.rocketmq.remoting.exception.RemotingCommandException; + + +/** + * @author manhong.yqd + */ +public class CloneGroupOffsetRequestHeader implements CommandCustomHeader { + @CFNotNull + private String srcGroup; + @CFNotNull + private String destGroup; + private String topic; + private boolean offline; + + + @Override + public void checkFields() throws RemotingCommandException { + } + + + public String getDestGroup() { + return destGroup; + } + + + public void setDestGroup(String destGroup) { + this.destGroup = destGroup; + } + + + public String getTopic() { + return topic; + } + + + public void setTopic(String topic) { + this.topic = topic; + } + + + public String getSrcGroup() { + + return srcGroup; + } + + + public void setSrcGroup(String srcGroup) { + this.srcGroup = srcGroup; + } + + + public boolean isOffline() { + return offline; + } + + + public void setOffline(boolean offline) { + this.offline = offline; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/ConsumeMessageDirectlyResultRequestHeader.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/ConsumeMessageDirectlyResultRequestHeader.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/ConsumeMessageDirectlyResultRequestHeader.java new file mode 100644 index 0000000..3c68636 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/ConsumeMessageDirectlyResultRequestHeader.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.protocol.header; + +import com.alibaba.rocketmq.remoting.CommandCustomHeader; +import com.alibaba.rocketmq.remoting.annotation.CFNotNull; +import com.alibaba.rocketmq.remoting.annotation.CFNullable; +import com.alibaba.rocketmq.remoting.exception.RemotingCommandException; + + +public class ConsumeMessageDirectlyResultRequestHeader implements CommandCustomHeader { + @CFNotNull + private String consumerGroup; + @CFNullable + private String clientId; + @CFNullable + private String msgId; + @CFNullable + private String brokerName; + + + @Override + public void checkFields() throws RemotingCommandException { + } + + + public String getConsumerGroup() { + return consumerGroup; + } + + + public void setConsumerGroup(String consumerGroup) { + this.consumerGroup = consumerGroup; + } + + + public String getBrokerName() { + return brokerName; + } + + + public void setBrokerName(String brokerName) { + this.brokerName = brokerName; + } + + + public String getClientId() { + return clientId; + } + + + public void setClientId(String clientId) { + this.clientId = clientId; + } + + + public String getMsgId() { + return msgId; + } + + + public void setMsgId(String msgId) { + this.msgId = msgId; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/ConsumerSendMsgBackRequestHeader.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/ConsumerSendMsgBackRequestHeader.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/ConsumerSendMsgBackRequestHeader.java new file mode 100644 index 0000000..c0acf88 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/ConsumerSendMsgBackRequestHeader.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.protocol.header; + +import com.alibaba.rocketmq.remoting.CommandCustomHeader; +import com.alibaba.rocketmq.remoting.annotation.CFNotNull; +import com.alibaba.rocketmq.remoting.annotation.CFNullable; +import com.alibaba.rocketmq.remoting.exception.RemotingCommandException; + + +/** + * @author shijia.wxr + * + */ +public class ConsumerSendMsgBackRequestHeader implements CommandCustomHeader { + @CFNotNull + private Long offset; + @CFNotNull + private String group; + @CFNotNull + private Integer delayLevel; + private String originMsgId; + private String originTopic; + @CFNullable + private boolean unitMode = false; + private Integer maxReconsumeTimes; + + + @Override + public void checkFields() throws RemotingCommandException { + + } + + + public Long getOffset() { + return offset; + } + + + public void setOffset(Long offset) { + this.offset = offset; + } + + + public String getGroup() { + return group; + } + + + public void setGroup(String group) { + this.group = group; + } + + + public Integer getDelayLevel() { + return delayLevel; + } + + + public void setDelayLevel(Integer delayLevel) { + this.delayLevel = delayLevel; + } + + + public String getOriginMsgId() { + return originMsgId; + } + + + public void setOriginMsgId(String originMsgId) { + this.originMsgId = originMsgId; + } + + + public String getOriginTopic() { + return originTopic; + } + + + public void setOriginTopic(String originTopic) { + this.originTopic = originTopic; + } + + + public boolean isUnitMode() { + return unitMode; + } + + + public void setUnitMode(boolean unitMode) { + this.unitMode = unitMode; + } + + + public Integer getMaxReconsumeTimes() { + return maxReconsumeTimes; + } + + + public void setMaxReconsumeTimes(final Integer maxReconsumeTimes) { + this.maxReconsumeTimes = maxReconsumeTimes; + } + + + @Override + public String toString() { + return "ConsumerSendMsgBackRequestHeader [group=" + group + ", originTopic=" + originTopic + ", originMsgId=" + originMsgId + + ", delayLevel=" + delayLevel + ", unitMode=" + unitMode + ", maxReconsumeTimes=" + maxReconsumeTimes + "]"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/CreateTopicRequestHeader.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/CreateTopicRequestHeader.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/CreateTopicRequestHeader.java new file mode 100644 index 0000000..a9d219c --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/CreateTopicRequestHeader.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * $Id: CreateTopicRequestHeader.java 1835 2013-05-16 02:00:50Z shijia.wxr $ + */ +package com.alibaba.rocketmq.common.protocol.header; + +import com.alibaba.rocketmq.common.TopicFilterType; +import com.alibaba.rocketmq.remoting.CommandCustomHeader; +import com.alibaba.rocketmq.remoting.annotation.CFNotNull; +import com.alibaba.rocketmq.remoting.exception.RemotingCommandException; + + +/** + * @author shijia.wxr + */ +public class CreateTopicRequestHeader implements CommandCustomHeader { + @CFNotNull + private String topic; + @CFNotNull + private String defaultTopic; + @CFNotNull + private Integer readQueueNums; + @CFNotNull + private Integer writeQueueNums; + @CFNotNull + private Integer perm; + @CFNotNull + private String topicFilterType; + private Integer topicSysFlag; + @CFNotNull + private Boolean order = false; + + + @Override + public void checkFields() throws RemotingCommandException { + try { + TopicFilterType.valueOf(this.topicFilterType); + } catch (Exception e) { + throw new RemotingCommandException("topicFilterType = [" + topicFilterType + "] value invalid", e); + } + } + + + public TopicFilterType getTopicFilterTypeEnum() { + return TopicFilterType.valueOf(this.topicFilterType); + } + + + public String getTopic() { + return topic; + } + + + public void setTopic(String topic) { + this.topic = topic; + } + + + public String getDefaultTopic() { + return defaultTopic; + } + + + public void setDefaultTopic(String defaultTopic) { + this.defaultTopic = defaultTopic; + } + + + public Integer getReadQueueNums() { + return readQueueNums; + } + + + public void setReadQueueNums(Integer readQueueNums) { + this.readQueueNums = readQueueNums; + } + + + public Integer getWriteQueueNums() { + return writeQueueNums; + } + + + public void setWriteQueueNums(Integer writeQueueNums) { + this.writeQueueNums = writeQueueNums; + } + + + public Integer getPerm() { + return perm; + } + + + public void setPerm(Integer perm) { + this.perm = perm; + } + + + public String getTopicFilterType() { + return topicFilterType; + } + + + public void setTopicFilterType(String topicFilterType) { + this.topicFilterType = topicFilterType; + } + + + public Integer getTopicSysFlag() { + return topicSysFlag; + } + + + public void setTopicSysFlag(Integer topicSysFlag) { + this.topicSysFlag = topicSysFlag; + } + + + public Boolean getOrder() { + return order; + } + + + public void setOrder(Boolean order) { + this.order = order; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/DeleteSubscriptionGroupRequestHeader.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/DeleteSubscriptionGroupRequestHeader.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/DeleteSubscriptionGroupRequestHeader.java new file mode 100644 index 0000000..9307c01 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/DeleteSubscriptionGroupRequestHeader.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.protocol.header; + +import com.alibaba.rocketmq.remoting.CommandCustomHeader; +import com.alibaba.rocketmq.remoting.annotation.CFNotNull; +import com.alibaba.rocketmq.remoting.exception.RemotingCommandException; + + +/** + * @author manhong.yqd + */ +public class DeleteSubscriptionGroupRequestHeader implements CommandCustomHeader { + @CFNotNull + private String groupName; + + + @Override + public void checkFields() throws RemotingCommandException { + } + + + public String getGroupName() { + return groupName; + } + + + public void setGroupName(String groupName) { + this.groupName = groupName; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/DeleteTopicRequestHeader.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/DeleteTopicRequestHeader.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/DeleteTopicRequestHeader.java new file mode 100644 index 0000000..4b1a844 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/DeleteTopicRequestHeader.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * $Id: DeleteTopicRequestHeader.java 1835 2013-05-16 02:00:50Z shijia.wxr $ + */ +package com.alibaba.rocketmq.common.protocol.header; + +import com.alibaba.rocketmq.remoting.CommandCustomHeader; +import com.alibaba.rocketmq.remoting.annotation.CFNotNull; +import com.alibaba.rocketmq.remoting.exception.RemotingCommandException; + + +/** + * @author shijia.wxr + */ +public class DeleteTopicRequestHeader implements CommandCustomHeader { + @CFNotNull + private String topic; + + + @Override + public void checkFields() throws RemotingCommandException { + } + + + public String getTopic() { + return topic; + } + + + public void setTopic(String topic) { + this.topic = topic; + } +}
