http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/GroupConsumeInfo.java ---------------------------------------------------------------------- diff --git a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/GroupConsumeInfo.java b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/GroupConsumeInfo.java new file mode 100644 index 0000000..27c5f92 --- /dev/null +++ b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/GroupConsumeInfo.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.console.model; + +import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; + +public class GroupConsumeInfo implements Comparable<GroupConsumeInfo> { + private String group; + private String version; + private int count; + private ConsumeType consumeType; + private MessageModel messageModel; + private int consumeTps; + private long diffTotal = -1; + + public String getGroup() { + return group; + } + + public void setGroup(String group) { + this.group = group; + } + + public int getCount() { + return count; + } + + public void setCount(int count) { + this.count = count; + } + + public ConsumeType getConsumeType() { + return consumeType; + } + + public void setConsumeType(ConsumeType consumeType) { + this.consumeType = consumeType; + } + + public MessageModel getMessageModel() { + return messageModel; + } + + public void setMessageModel(MessageModel messageModel) { + this.messageModel = messageModel; + } + + public long getDiffTotal() { + return diffTotal; + } + + public void setDiffTotal(long diffTotal) { + this.diffTotal = diffTotal; + } + + @Override + public int compareTo(GroupConsumeInfo o) { + if (this.count != o.count) { + return o.count - this.count; + } + + return (int) (o.diffTotal - diffTotal); + } + + public int getConsumeTps() { + return consumeTps; + } + + public void setConsumeTps(int consumeTps) { + this.consumeTps = consumeTps; + } + + public String getVersion() { + return version; + } + + public void setVersion(String version) { + this.version = version; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/MessageView.java ---------------------------------------------------------------------- diff --git a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/MessageView.java b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/MessageView.java new file mode 100644 index 0000000..4011cad --- /dev/null +++ b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/MessageView.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.console.model; + +import org.apache.rocketmq.common.message.MessageExt; +import com.google.common.base.Charsets; +import org.springframework.beans.BeanUtils; + +import java.net.SocketAddress; +import java.util.Map; + +public class MessageView { + + /** from MessageExt **/ + private int queueId; + private int storeSize; + private long queueOffset; + private int sysFlag; + private long bornTimestamp; + private SocketAddress bornHost; + private long storeTimestamp; + private SocketAddress storeHost; + private String msgId; + private long commitLogOffset; + private int bodyCRC; + private int reconsumeTimes; + private long preparedTransactionOffset; + /**from MessageExt**/ + + /** from Message **/ + private String topic; + private int flag; + private Map<String, String> properties; + private String messageBody; // body + + /** from Message **/ + + public static MessageView fromMessageExt(MessageExt messageExt) { + MessageView messageView = new MessageView(); + BeanUtils.copyProperties(messageExt, messageView); + if (messageExt.getBody() != null) { + messageView.setMessageBody(new String(messageExt.getBody(), Charsets.UTF_8)); + } + return messageView; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public int getFlag() { + return flag; + } + + public void setFlag(int flag) { + this.flag = flag; + } + + public Map<String, String> getProperties() { + return properties; + } + + public void setProperties(Map<String, String> properties) { + this.properties = properties; + } + + public int getQueueId() { + return queueId; + } + + public void setQueueId(int queueId) { + this.queueId = queueId; + } + + public int getStoreSize() { + return storeSize; + } + + public void setStoreSize(int storeSize) { + this.storeSize = storeSize; + } + + public long getQueueOffset() { + return queueOffset; + } + + public void setQueueOffset(long queueOffset) { + this.queueOffset = queueOffset; + } + + public int getSysFlag() { + return sysFlag; + } + + public void setSysFlag(int sysFlag) { + this.sysFlag = sysFlag; + } + + public long getBornTimestamp() { + return bornTimestamp; + } + + public void setBornTimestamp(long bornTimestamp) { + this.bornTimestamp = bornTimestamp; + } + + public SocketAddress getBornHost() { + return bornHost; + } + + public void setBornHost(SocketAddress bornHost) { + this.bornHost = bornHost; + } + + public long getStoreTimestamp() { + return storeTimestamp; + } + + public void setStoreTimestamp(long storeTimestamp) { + this.storeTimestamp = storeTimestamp; + } + + public SocketAddress getStoreHost() { + return storeHost; + } + + public void setStoreHost(SocketAddress storeHost) { + this.storeHost = storeHost; + } + + public String getMsgId() { + return msgId; + } + + public void setMsgId(String msgId) { + this.msgId = msgId; + } + + public long getCommitLogOffset() { + return commitLogOffset; + } + + public void setCommitLogOffset(long commitLogOffset) { + this.commitLogOffset = commitLogOffset; + } + + public int getBodyCRC() { + return bodyCRC; + } + + public void setBodyCRC(int bodyCRC) { + this.bodyCRC = bodyCRC; + } + + public int getReconsumeTimes() { + return reconsumeTimes; + } + + public void setReconsumeTimes(int reconsumeTimes) { + this.reconsumeTimes = reconsumeTimes; + } + + public long getPreparedTransactionOffset() { + return preparedTransactionOffset; + } + + public void setPreparedTransactionOffset(long preparedTransactionOffset) { + this.preparedTransactionOffset = preparedTransactionOffset; + } + + public String getMessageBody() { + return messageBody; + } + + public void setMessageBody(String messageBody) { + this.messageBody = messageBody; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/QueueStatInfo.java ---------------------------------------------------------------------- diff --git a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/QueueStatInfo.java b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/QueueStatInfo.java new file mode 100644 index 0000000..a6d67a8 --- /dev/null +++ b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/QueueStatInfo.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.console.model; + +import org.apache.rocketmq.common.admin.OffsetWrapper; +import org.apache.rocketmq.common.message.MessageQueue; +import org.springframework.beans.BeanUtils; + +public class QueueStatInfo { + private String brokerName; + private int queueId; + private long brokerOffset; + private long consumerOffset; + private String clientInfo; + private long lastTimestamp; + + public static QueueStatInfo fromOffsetTableEntry(MessageQueue key, OffsetWrapper value) { + QueueStatInfo queueStatInfo = new QueueStatInfo(); + BeanUtils.copyProperties(key, queueStatInfo); + BeanUtils.copyProperties(value, queueStatInfo); + return queueStatInfo; + } + + public String getClientInfo() { + return clientInfo; + } + + public void setClientInfo(String clientInfo) { + this.clientInfo = clientInfo; + } + + public String getBrokerName() { + return brokerName; + } + + public void setBrokerName(String brokerName) { + this.brokerName = brokerName; + } + + public int getQueueId() { + return queueId; + } + + public void setQueueId(int queueId) { + this.queueId = queueId; + } + + public long getBrokerOffset() { + return brokerOffset; + } + + public void setBrokerOffset(long brokerOffset) { + this.brokerOffset = brokerOffset; + } + + public long getConsumerOffset() { + return consumerOffset; + } + + public void setConsumerOffset(long consumerOffset) { + this.consumerOffset = consumerOffset; + } + + public long getLastTimestamp() { + return lastTimestamp; + } + + public void setLastTimestamp(long lastTimestamp) { + this.lastTimestamp = lastTimestamp; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/TopicConsumerInfo.java ---------------------------------------------------------------------- diff --git a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/TopicConsumerInfo.java b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/TopicConsumerInfo.java new file mode 100644 index 0000000..bd6d8e2 --- /dev/null +++ b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/TopicConsumerInfo.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.console.model; + +import com.google.common.collect.Lists; + +import java.util.List; + +public class TopicConsumerInfo { + private String topic; + private long diffTotal; + private long lastTimestamp; + private List<QueueStatInfo> queueStatInfoList = Lists.newArrayList(); + + public TopicConsumerInfo(String topic) { + this.topic = topic; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public long getDiffTotal() { + return diffTotal; + } + + public void setDiffTotal(long diffTotal) { + this.diffTotal = diffTotal; + } + + public List<QueueStatInfo> getQueueStatInfoList() { + return queueStatInfoList; + } + + public long getLastTimestamp() { + return lastTimestamp; + } + + public void appendQueueStatInfo(QueueStatInfo queueStatInfo) { + queueStatInfoList.add(queueStatInfo); + diffTotal = diffTotal + (queueStatInfo.getBrokerOffset() - queueStatInfo.getConsumerOffset()); + lastTimestamp = Math.max(lastTimestamp, queueStatInfo.getLastTimestamp()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/request/ConsumerConfigInfo.java ---------------------------------------------------------------------- diff --git a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/request/ConsumerConfigInfo.java b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/request/ConsumerConfigInfo.java new file mode 100644 index 0000000..cee155c --- /dev/null +++ b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/request/ConsumerConfigInfo.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.console.model.request; + +import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; + +import java.util.List; + +public class ConsumerConfigInfo { + private List<String> clusterNameList; + + private List<String> brokerNameList; + private SubscriptionGroupConfig subscriptionGroupConfig; + + public ConsumerConfigInfo() { + } + + public ConsumerConfigInfo(List<String> brokerNameList, SubscriptionGroupConfig subscriptionGroupConfig) { + this.brokerNameList = brokerNameList; + this.subscriptionGroupConfig = subscriptionGroupConfig; + } + + public List<String> getClusterNameList() { + return clusterNameList; + } + + public void setClusterNameList(List<String> clusterNameList) { + this.clusterNameList = clusterNameList; + } + + public List<String> getBrokerNameList() { + return brokerNameList; + } + + public void setBrokerNameList(List<String> brokerNameList) { + this.brokerNameList = brokerNameList; + } + + public SubscriptionGroupConfig getSubscriptionGroupConfig() { + return subscriptionGroupConfig; + } + + public void setSubscriptionGroupConfig(SubscriptionGroupConfig subscriptionGroupConfig) { + this.subscriptionGroupConfig = subscriptionGroupConfig; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/request/DeleteSubGroupRequest.java ---------------------------------------------------------------------- diff --git a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/request/DeleteSubGroupRequest.java b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/request/DeleteSubGroupRequest.java new file mode 100644 index 0000000..152256b --- /dev/null +++ b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/request/DeleteSubGroupRequest.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.console.model.request; + +import java.util.List; + +public class DeleteSubGroupRequest { + private String groupName; + private List<String> brokerNameList; + + public String getGroupName() { + return groupName; + } + + public void setGroupName(String groupName) { + this.groupName = groupName; + } + + public List<String> getBrokerNameList() { + return brokerNameList; + } + + public void setBrokerNameList(List<String> brokerNameList) { + this.brokerNameList = brokerNameList; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/request/ResetOffsetRequest.java ---------------------------------------------------------------------- diff --git a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/request/ResetOffsetRequest.java b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/request/ResetOffsetRequest.java new file mode 100644 index 0000000..22263af --- /dev/null +++ b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/request/ResetOffsetRequest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.console.model.request; + +import java.util.List; + +public class ResetOffsetRequest { + private List<String> consumerGroupList; + private String topic; + private long resetTime; + private boolean force; + + public List<String> getConsumerGroupList() { + return consumerGroupList; + } + + public void setConsumerGroupList(List<String> consumerGroupList) { + this.consumerGroupList = consumerGroupList; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public long getResetTime() { + return resetTime; + } + + public void setResetTime(long resetTime) { + this.resetTime = resetTime; + } + + public boolean isForce() { + return force; + } + + public void setForce(boolean force) { + this.force = force; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/request/SendTopicMessageRequest.java ---------------------------------------------------------------------- diff --git a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/request/SendTopicMessageRequest.java b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/request/SendTopicMessageRequest.java new file mode 100644 index 0000000..c7ffa8a --- /dev/null +++ b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/request/SendTopicMessageRequest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.console.model.request; + +public class SendTopicMessageRequest { + private String topic; + private String key; + private String tag; + private String messageBody; + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } + + public String getTag() { + return tag; + } + + public void setTag(String tag) { + this.tag = tag; + } + + public String getMessageBody() { + return messageBody; + } + + public void setMessageBody(String messageBody) { + this.messageBody = messageBody; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/request/TopicConfigInfo.java ---------------------------------------------------------------------- diff --git a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/request/TopicConfigInfo.java b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/request/TopicConfigInfo.java new file mode 100644 index 0000000..e1f56d4 --- /dev/null +++ b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/model/request/TopicConfigInfo.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.console.model.request; + +import com.google.common.base.Objects; + +import java.util.List; + +public class TopicConfigInfo { + + private List<String> clusterNameList; + private List<String> brokerNameList; + + /** topicConfig */ + private String topicName; + private int writeQueueNums; + private int readQueueNums; + private int perm; + private boolean order; + + public List<String> getClusterNameList() { + return clusterNameList; + } + + public void setClusterNameList(List<String> clusterNameList) { + this.clusterNameList = clusterNameList; + } + + /** topicConfig */ + + + + public List<String> getBrokerNameList() { + return brokerNameList; + } + + public void setBrokerNameList(List<String> brokerNameList) { + this.brokerNameList = brokerNameList; + } + + public String getTopicName() { + return topicName; + } + + public void setTopicName(String topicName) { + this.topicName = topicName; + } + + public int getWriteQueueNums() { + return writeQueueNums; + } + + public void setWriteQueueNums(int writeQueueNums) { + this.writeQueueNums = writeQueueNums; + } + + public int getReadQueueNums() { + return readQueueNums; + } + + public void setReadQueueNums(int readQueueNums) { + this.readQueueNums = readQueueNums; + } + + public int getPerm() { + return perm; + } + + public void setPerm(int perm) { + this.perm = perm; + } + + public boolean isOrder() { + return order; + } + + public void setOrder(boolean order) { + this.order = order; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + TopicConfigInfo that = (TopicConfigInfo) o; + return writeQueueNums == that.writeQueueNums && + readQueueNums == that.readQueueNums && + perm == that.perm && + order == that.order && + Objects.equal(topicName, that.topicName); + } + + @Override + public int hashCode() { + return Objects.hashCode(topicName, writeQueueNums, readQueueNums, perm, order); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/AbstractCommonService.java ---------------------------------------------------------------------- diff --git a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/AbstractCommonService.java b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/AbstractCommonService.java new file mode 100644 index 0000000..53a0e21 --- /dev/null +++ b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/AbstractCommonService.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.console.service; + +import org.apache.rocketmq.tools.admin.MQAdminExt; +import com.google.common.base.Throwables; +import com.google.common.collect.Sets; +import java.util.HashMap; +import java.util.List; +import java.util.Set; +import javax.annotation.Resource; +import org.apache.commons.collections.CollectionUtils; + +public abstract class AbstractCommonService { + @Resource + protected MQAdminExt mqAdminExt; + protected final Set<String> changeToBrokerNameSet(HashMap<String, Set<String>> clusterAddrTable, + List<String> clusterNameList, List<String> brokerNameList) { + Set<String> finalBrokerNameList = Sets.newHashSet(); + if (CollectionUtils.isNotEmpty(clusterNameList)) { + try { + for (String clusterName : clusterNameList) { + finalBrokerNameList.addAll(clusterAddrTable.get(clusterName)); + } + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + if (CollectionUtils.isNotEmpty(brokerNameList)) { + finalBrokerNameList.addAll(brokerNameList); + } + return finalBrokerNameList; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/ClusterService.java ---------------------------------------------------------------------- diff --git a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/ClusterService.java b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/ClusterService.java new file mode 100644 index 0000000..43489fb --- /dev/null +++ b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/ClusterService.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.console.service; + +import java.util.Map; +import java.util.Properties; + +public interface ClusterService { + Map<String, Object> list(); + + Properties getBrokerConfig(String brokerAddr); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/ConsumerService.java ---------------------------------------------------------------------- diff --git a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/ConsumerService.java b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/ConsumerService.java new file mode 100644 index 0000000..1d9ac12 --- /dev/null +++ b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/ConsumerService.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.console.service; + +import org.apache.rocketmq.common.protocol.body.ConsumerConnection; +import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.console.model.ConsumerGroupRollBackStat; +import org.apache.rocketmq.console.model.GroupConsumeInfo; +import org.apache.rocketmq.console.model.TopicConsumerInfo; +import org.apache.rocketmq.console.model.request.ConsumerConfigInfo; +import org.apache.rocketmq.console.model.request.DeleteSubGroupRequest; +import org.apache.rocketmq.console.model.request.ResetOffsetRequest; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +public interface ConsumerService { + List<GroupConsumeInfo> queryGroupList(); + + GroupConsumeInfo queryGroup(String consumerGroup); + + + List<TopicConsumerInfo> queryConsumeStatsListByGroupName(String groupName); + + List<TopicConsumerInfo> queryConsumeStatsList(String topic, String groupName); + + Map<String, TopicConsumerInfo> queryConsumeStatsListByTopicName(String topic); + + Map<String /*consumerGroup*/, ConsumerGroupRollBackStat> resetOffset(ResetOffsetRequest resetOffsetRequest); + + List<ConsumerConfigInfo> examineSubscriptionGroupConfig(String group); + + boolean deleteSubGroup(DeleteSubGroupRequest deleteSubGroupRequest); + + boolean createAndUpdateSubscriptionGroupConfig(ConsumerConfigInfo consumerConfigInfo); + + Set<String> fetchBrokerNameSetBySubscriptionGroup(String group); + + ConsumerConnection getConsumerConnection(String consumerGroup); + + ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, boolean jstack); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/DashboardCollectService.java ---------------------------------------------------------------------- diff --git a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/DashboardCollectService.java b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/DashboardCollectService.java new file mode 100644 index 0000000..27a3645 --- /dev/null +++ b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/DashboardCollectService.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.console.service; + +import com.google.common.cache.LoadingCache; +import java.io.File; +import java.util.List; +import java.util.Map; + +public interface DashboardCollectService { + // todo just move the task to org.apache.rocketmq.console.task.DashboardCollectTask + // the code can be reconstruct + LoadingCache<String, List<String>> getBrokerMap(); + + LoadingCache<String, List<String>> getTopicMap(); + + Map<String, List<String>> jsonDataFile2map(File file); + + Map<String, List<String>> getBrokerCache(String date); + + Map<String, List<String>> getTopicCache(String date); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/DashboardService.java ---------------------------------------------------------------------- diff --git a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/DashboardService.java b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/DashboardService.java new file mode 100644 index 0000000..a4cf798 --- /dev/null +++ b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/DashboardService.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.console.service; + +import java.util.List; +import java.util.Map; + +public interface DashboardService { + /** + * @param date format yyyy-MM-dd + */ + Map<String, List<String>> queryBrokerData(String date); + + /** + * @param date format yyyy-MM-dd + */ + Map<String, List<String>> queryTopicData(String date); + + /** + * @param date format yyyy-MM-dd + * @param topicName + */ + List<String> queryTopicData(String date, String topicName); + + List<String> queryTopicCurrentData(); + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/MessageService.java ---------------------------------------------------------------------- diff --git a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/MessageService.java b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/MessageService.java new file mode 100644 index 0000000..e56b4d8 --- /dev/null +++ b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/MessageService.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.console.service; + +import org.apache.rocketmq.common.Pair; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; +import org.apache.rocketmq.tools.admin.api.MessageTrack; +import org.apache.rocketmq.console.model.MessageView; + +import java.util.List; + +public interface MessageService { + /** + * @param subject + * @param msgId + */ + Pair<MessageView, List<MessageTrack>> viewMessage(String subject, final String msgId); + + List<MessageView> queryMessageByTopicAndKey(final String topic, final String key); + + /** + * @param topic + * @param begin + * @param end + * org.apache.rocketmq.tools.command.message.PrintMessageSubCommand + */ + List<MessageView> queryMessageByTopic(final String topic, final long begin, + final long end); + + List<MessageTrack> messageTrackDetail(MessageExt msg); + + ConsumeMessageDirectlyResult consumeMessageDirectly(String topic, String msgId, String consumerGroup, + String clientId); + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/MonitorService.java ---------------------------------------------------------------------- diff --git a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/MonitorService.java b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/MonitorService.java new file mode 100644 index 0000000..4bf659c --- /dev/null +++ b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/MonitorService.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.console.service; + +import java.util.Map; +import org.apache.rocketmq.console.model.ConsumerMonitorConfig; + +public interface MonitorService { + boolean createOrUpdateConsumerMonitor(String name, ConsumerMonitorConfig config); + + Map<String, ConsumerMonitorConfig> queryConsumerMonitorConfig(); + + ConsumerMonitorConfig queryConsumerMonitorConfigByGroupName(String consumeGroupName); + + boolean deleteConsumerMonitor(String consumeGroupName); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/OpsService.java ---------------------------------------------------------------------- diff --git a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/OpsService.java b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/OpsService.java new file mode 100644 index 0000000..d3bd68c --- /dev/null +++ b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/OpsService.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.console.service; + +import java.util.Map; +import org.apache.rocketmq.console.service.checker.CheckerType; + +public interface OpsService { + Map<String, Object> homePageInfo(); + + void updateNameSvrAddrList(String nameSvrAddrList); + + String getNameSvrList(); + + Map<CheckerType,Object> rocketMqStatusCheck(); + + boolean updateIsVIPChannel(String useVIPChannel); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/ProducerService.java ---------------------------------------------------------------------- diff --git a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/ProducerService.java b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/ProducerService.java new file mode 100644 index 0000000..cda7c48 --- /dev/null +++ b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/ProducerService.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.console.service; + +import org.apache.rocketmq.common.protocol.body.ProducerConnection; + +public interface ProducerService { + ProducerConnection getProducerConnection(String producerGroup, String topic); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/TopicService.java ---------------------------------------------------------------------- diff --git a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/TopicService.java b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/TopicService.java new file mode 100644 index 0000000..41a6b3b --- /dev/null +++ b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/TopicService.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.console.service; + +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.admin.TopicStatsTable; +import org.apache.rocketmq.common.protocol.body.GroupList; +import org.apache.rocketmq.common.protocol.body.TopicList; +import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.console.model.request.SendTopicMessageRequest; +import org.apache.rocketmq.console.model.request.TopicConfigInfo; + +import java.util.List; + +public interface TopicService { + TopicList fetchAllTopicList(); + + TopicStatsTable stats(String topic); + + TopicRouteData route(String topic); + + GroupList queryTopicConsumerInfo(String topic); + + void createOrUpdate(TopicConfigInfo topicCreateOrUpdateRequest); + + TopicConfig examineTopicConfig(String topic, String brokerName); + + List<TopicConfigInfo> examineTopicConfig(String topic); + + boolean deleteTopic(String topic, String clusterName); + + boolean deleteTopic(String topic); + + boolean deleteTopicInBroker(String brokerName, String topic); + + SendResult sendTopicMessageRequest(SendTopicMessageRequest sendTopicMessageRequest); + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/checker/CheckerType.java ---------------------------------------------------------------------- diff --git a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/checker/CheckerType.java b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/checker/CheckerType.java new file mode 100644 index 0000000..8a85008 --- /dev/null +++ b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/checker/CheckerType.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.console.service.checker; + +public enum CheckerType { + CLUSTER_HEALTH_CHECK, + TOPIC_ONLY_ONE_BROKER_CHECK + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/checker/RocketMqChecker.java ---------------------------------------------------------------------- diff --git a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/checker/RocketMqChecker.java b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/checker/RocketMqChecker.java new file mode 100644 index 0000000..3b8f58d --- /dev/null +++ b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/checker/RocketMqChecker.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.console.service.checker; + +public interface RocketMqChecker { + public Object doCheck(); + + public CheckerType checkerType(); + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/checker/impl/ClusterHealthCheckerImpl.java ---------------------------------------------------------------------- diff --git a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/checker/impl/ClusterHealthCheckerImpl.java b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/checker/impl/ClusterHealthCheckerImpl.java new file mode 100644 index 0000000..5c2c893 --- /dev/null +++ b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/checker/impl/ClusterHealthCheckerImpl.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.console.service.checker.impl; + +import org.apache.rocketmq.console.service.checker.CheckerType; +import org.apache.rocketmq.console.service.checker.RocketMqChecker; +import org.springframework.stereotype.Service; + +@Service +public class ClusterHealthCheckerImpl implements RocketMqChecker { + @Override + public Object doCheck() { + return null; + } + + @Override + public CheckerType checkerType() { + return CheckerType.CLUSTER_HEALTH_CHECK; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/checker/impl/TopicOnlyOneBrokerCheckerImpl.java ---------------------------------------------------------------------- diff --git a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/checker/impl/TopicOnlyOneBrokerCheckerImpl.java b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/checker/impl/TopicOnlyOneBrokerCheckerImpl.java new file mode 100644 index 0000000..0f06a13 --- /dev/null +++ b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/checker/impl/TopicOnlyOneBrokerCheckerImpl.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.console.service.checker.impl; + +import org.apache.rocketmq.console.service.checker.CheckerType; +import org.apache.rocketmq.console.service.checker.RocketMqChecker; +import org.springframework.stereotype.Service; + +@Service +public class TopicOnlyOneBrokerCheckerImpl implements RocketMqChecker { + @Override + public Object doCheck() { + return null; + } + + @Override + public CheckerType checkerType() { + return CheckerType.TOPIC_ONLY_ONE_BROKER_CHECK; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/client/MQAdminExtImpl.java ---------------------------------------------------------------------- diff --git a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/client/MQAdminExtImpl.java b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/client/MQAdminExtImpl.java new file mode 100644 index 0000000..0eba3a5 --- /dev/null +++ b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/client/MQAdminExtImpl.java @@ -0,0 +1,501 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.console.service.client; + +import com.google.common.base.Throwables; +import java.io.UnsupportedEncodingException; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import org.apache.rocketmq.client.QueryResult; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.MQAdminImpl; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.admin.ConsumeStats; +import org.apache.rocketmq.common.admin.RollbackStats; +import org.apache.rocketmq.common.admin.TopicStatsTable; +import org.apache.rocketmq.common.message.MessageClientIDSetter; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.RequestCode; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.body.BrokerStatsData; +import org.apache.rocketmq.common.protocol.body.ClusterInfo; +import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; +import org.apache.rocketmq.common.protocol.body.ConsumeStatsList; +import org.apache.rocketmq.common.protocol.body.ConsumerConnection; +import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.common.protocol.body.GroupList; +import org.apache.rocketmq.common.protocol.body.KVTable; +import org.apache.rocketmq.common.protocol.body.ProducerConnection; +import org.apache.rocketmq.common.protocol.body.QueueTimeSpan; +import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; +import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; +import org.apache.rocketmq.common.protocol.body.TopicList; +import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.console.util.JsonUtil; +import org.apache.rocketmq.remoting.RemotingClient; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.tools.admin.MQAdminExt; +import org.apache.rocketmq.tools.admin.api.MessageTrack; +import org.joor.Reflect; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +import static org.apache.rocketmq.remoting.protocol.RemotingSerializable.decode; + +@Service +public class MQAdminExtImpl implements MQAdminExt { + private Logger logger = LoggerFactory.getLogger(MQAdminExtImpl.class); + + public MQAdminExtImpl() { + } + + @Override + public void updateBrokerConfig(String brokerAddr, Properties properties) + throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, + UnsupportedEncodingException, InterruptedException, MQBrokerException { + MQAdminInstance.threadLocalMQAdminExt().updateBrokerConfig(brokerAddr, properties); + } + + @Override + public void createAndUpdateTopicConfig(String addr, TopicConfig config) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + MQAdminInstance.threadLocalMQAdminExt().createAndUpdateTopicConfig(addr, config); + } + + @Override + public void createAndUpdateSubscriptionGroupConfig(String addr, SubscriptionGroupConfig config) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + MQAdminInstance.threadLocalMQAdminExt().createAndUpdateSubscriptionGroupConfig(addr, config); + } + + @Override + public SubscriptionGroupConfig examineSubscriptionGroupConfig(String addr, String group) { + RemotingClient remotingClient = MQAdminInstance.threadLocalRemotingClient(); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null); + RemotingCommand response = null; + try { + response = remotingClient.invokeSync(addr, request, 3000); + } + catch (Exception err) { + throw Throwables.propagate(err); + } + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + SubscriptionGroupWrapper subscriptionGroupWrapper = decode(response.getBody(), SubscriptionGroupWrapper.class); + return subscriptionGroupWrapper.getSubscriptionGroupTable().get(group); + } + default: + throw Throwables.propagate(new MQBrokerException(response.getCode(), response.getRemark())); + } + } + + @Override + public TopicConfig examineTopicConfig(String addr, String topic) { + RemotingClient remotingClient = MQAdminInstance.threadLocalRemotingClient(); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null); + RemotingCommand response = null; + try { + response = remotingClient.invokeSync(addr, request, 3000); + } + catch (Exception err) { + throw Throwables.propagate(err); + } + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + TopicConfigSerializeWrapper topicConfigSerializeWrapper = decode(response.getBody(), TopicConfigSerializeWrapper.class); + return topicConfigSerializeWrapper.getTopicConfigTable().get(topic); + } + default: + throw Throwables.propagate(new MQBrokerException(response.getCode(), response.getRemark())); + } + } + + @Override + public TopicStatsTable examineTopicStats(String topic) + throws RemotingException, MQClientException, InterruptedException, MQBrokerException { + return MQAdminInstance.threadLocalMQAdminExt().examineTopicStats(topic); + } + + @Override + public TopicList fetchAllTopicList() throws RemotingException, MQClientException, InterruptedException { + TopicList topicList = MQAdminInstance.threadLocalMQAdminExt().fetchAllTopicList(); + logger.debug("op=look={}", JsonUtil.obj2String(topicList.getTopicList())); + return topicList; + } + + @Override + public KVTable fetchBrokerRuntimeStats(String brokerAddr) + throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, + InterruptedException, MQBrokerException { + return MQAdminInstance.threadLocalMQAdminExt().fetchBrokerRuntimeStats(brokerAddr); + } + + @Override + public ConsumeStats examineConsumeStats(String consumerGroup) + throws RemotingException, MQClientException, InterruptedException, MQBrokerException { + return MQAdminInstance.threadLocalMQAdminExt().examineConsumeStats(consumerGroup); + } + + @Override + public ConsumeStats examineConsumeStats(String consumerGroup, String topic) + throws RemotingException, MQClientException, InterruptedException, MQBrokerException { + return MQAdminInstance.threadLocalMQAdminExt().examineConsumeStats(consumerGroup, topic); + } + + @Override + public ClusterInfo examineBrokerClusterInfo() + throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, + RemotingConnectException { + return MQAdminInstance.threadLocalMQAdminExt().examineBrokerClusterInfo(); + } + + @Override + public TopicRouteData examineTopicRouteInfo(String topic) + throws RemotingException, MQClientException, InterruptedException { + return MQAdminInstance.threadLocalMQAdminExt().examineTopicRouteInfo(topic); + } + + @Override + public ConsumerConnection examineConsumerConnectionInfo(String consumerGroup) + throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, + InterruptedException, MQBrokerException, RemotingException, MQClientException { + return MQAdminInstance.threadLocalMQAdminExt().examineConsumerConnectionInfo(consumerGroup); + } + + @Override + public ProducerConnection examineProducerConnectionInfo(String producerGroup, String topic) + throws RemotingException, MQClientException, InterruptedException, MQBrokerException { + return MQAdminInstance.threadLocalMQAdminExt().examineProducerConnectionInfo(producerGroup, topic); + } + + @Override + public List<String> getNameServerAddressList() { + return MQAdminInstance.threadLocalMQAdminExt().getNameServerAddressList(); + } + + @Override + public int wipeWritePermOfBroker(String namesrvAddr, String brokerName) + throws RemotingCommandException, RemotingConnectException, RemotingSendRequestException, + RemotingTimeoutException, InterruptedException, MQClientException { + return MQAdminInstance.threadLocalMQAdminExt().wipeWritePermOfBroker(namesrvAddr, brokerName); + } + + @Override + public void putKVConfig(String namespace, String key, String value) { + MQAdminInstance.threadLocalMQAdminExt().putKVConfig(namespace, key, value); + } + + @Override + public String getKVConfig(String namespace, String key) + throws RemotingException, MQClientException, InterruptedException { + return MQAdminInstance.threadLocalMQAdminExt().getKVConfig(namespace, key); + } + + @Override + public KVTable getKVListByNamespace(String namespace) + throws RemotingException, MQClientException, InterruptedException { + return MQAdminInstance.threadLocalMQAdminExt().getKVListByNamespace(namespace); + } + + @Override + public void deleteTopicInBroker(Set<String> addrs, String topic) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + logger.info("addrs={} topic={}", JsonUtil.obj2String(addrs), topic); + MQAdminInstance.threadLocalMQAdminExt().deleteTopicInBroker(addrs, topic); + } + + @Override + public void deleteTopicInNameServer(Set<String> addrs, String topic) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + MQAdminInstance.threadLocalMQAdminExt().deleteTopicInNameServer(addrs, topic); + } + + @Override + public void deleteSubscriptionGroup(String addr, String groupName) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + MQAdminInstance.threadLocalMQAdminExt().deleteSubscriptionGroup(addr, groupName); + } + + @Override + public void createAndUpdateKvConfig(String namespace, String key, String value) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + MQAdminInstance.threadLocalMQAdminExt().createAndUpdateKvConfig(namespace, key, value); + } + + @Override + public void deleteKvConfig(String namespace, String key) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + MQAdminInstance.threadLocalMQAdminExt().deleteKvConfig(namespace, key); + } + + @Override + public List<RollbackStats> resetOffsetByTimestampOld(String consumerGroup, String topic, long timestamp, + boolean force) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + return MQAdminInstance.threadLocalMQAdminExt().resetOffsetByTimestampOld(consumerGroup, topic, timestamp, force); + } + + @Override + public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp, + boolean isForce) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + return MQAdminInstance.threadLocalMQAdminExt().resetOffsetByTimestamp(topic, group, timestamp, isForce); + } + + @Override + public void resetOffsetNew(String consumerGroup, String topic, long timestamp) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + MQAdminInstance.threadLocalMQAdminExt().resetOffsetNew(consumerGroup, topic, timestamp); + } + + @Override + public Map<String, Map<MessageQueue, Long>> getConsumeStatus(String topic, String group, + String clientAddr) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + return MQAdminInstance.threadLocalMQAdminExt().getConsumeStatus(topic, group, clientAddr); + } + + @Override + public void createOrUpdateOrderConf(String key, String value, boolean isCluster) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + MQAdminInstance.threadLocalMQAdminExt().createOrUpdateOrderConf(key, value, isCluster); + } + + @Override + public GroupList queryTopicConsumeByWho(String topic) + throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, + InterruptedException, MQBrokerException, RemotingException, MQClientException { + return MQAdminInstance.threadLocalMQAdminExt().queryTopicConsumeByWho(topic); + } + + @Override + public boolean cleanExpiredConsumerQueue(String cluster) + throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, + InterruptedException { + return MQAdminInstance.threadLocalMQAdminExt().cleanExpiredConsumerQueue(cluster); + } + + @Override + public boolean cleanExpiredConsumerQueueByAddr(String addr) + throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, + InterruptedException { + return MQAdminInstance.threadLocalMQAdminExt().cleanExpiredConsumerQueueByAddr(addr); + } + + @Override + public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, boolean jstack) + throws RemotingException, MQClientException, InterruptedException { + return MQAdminInstance.threadLocalMQAdminExt().getConsumerRunningInfo(consumerGroup, clientId, jstack); + } + + @Override + public ConsumeMessageDirectlyResult consumeMessageDirectly(String consumerGroup, String clientId, + String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { + return MQAdminInstance.threadLocalMQAdminExt().consumeMessageDirectly(consumerGroup, clientId, msgId); + } + + @Override + public List<MessageTrack> messageTrackDetail(MessageExt msg) + throws RemotingException, MQClientException, InterruptedException, MQBrokerException { + return MQAdminInstance.threadLocalMQAdminExt().messageTrackDetail(msg); + } + + @Override + public void cloneGroupOffset(String srcGroup, String destGroup, String topic, boolean isOffline) + throws RemotingException, MQClientException, InterruptedException, MQBrokerException { + MQAdminInstance.threadLocalMQAdminExt().cloneGroupOffset(srcGroup, destGroup, topic, isOffline); + } + + @Override + public void createTopic(String key, String newTopic, int queueNum) throws MQClientException { + MQAdminInstance.threadLocalMQAdminExt().createTopic(key, newTopic, queueNum); + } + + @Override + public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) + throws MQClientException { + MQAdminInstance.threadLocalMQAdminExt().createTopic(key, newTopic, queueNum, topicSysFlag); + } + + @Override + public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { + return MQAdminInstance.threadLocalMQAdminExt().searchOffset(mq, timestamp); + } + + @Override + public long maxOffset(MessageQueue mq) throws MQClientException { + return MQAdminInstance.threadLocalMQAdminExt().maxOffset(mq); + } + + @Override + public long minOffset(MessageQueue mq) throws MQClientException { + return MQAdminInstance.threadLocalMQAdminExt().minOffset(mq); + } + + @Override + public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException { + return MQAdminInstance.threadLocalMQAdminExt().earliestMsgStoreTime(mq); + } + + @Override + public MessageExt viewMessage(String msgId) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + return MQAdminInstance.threadLocalMQAdminExt().viewMessage(msgId); + } + + @Override + public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) + throws MQClientException, InterruptedException { + return MQAdminInstance.threadLocalMQAdminExt().queryMessage(topic, key, maxNum, begin, end); + } + + @Override + @Deprecated + public void start() throws MQClientException { + throw new IllegalStateException("thisMethod is deprecated.use org.apache.rocketmq.console.aspect.admin.MQAdminAspect instead of this"); + } + + @Override + @Deprecated + public void shutdown() { + throw new IllegalStateException("thisMethod is deprecated.use org.apache.rocketmq.console.aspect.admin.MQAdminAspect instead of this"); + } + + // below is 3.2.6->3.5.8 updated + + @Override + public List<QueueTimeSpan> queryConsumeTimeSpan(String topic, + String group) throws InterruptedException, MQBrokerException, RemotingException, MQClientException { + return MQAdminInstance.threadLocalMQAdminExt().queryConsumeTimeSpan(topic, group); + } + + //MessageClientIDSetter.getNearlyTimeFromID has bug,so we subtract half a day + //next version we will remove it + //https://issues.apache.org/jira/browse/ROCKETMQ-111 + //https://github.com/apache/incubator-rocketmq/pull/69 + @Override + public MessageExt viewMessage(String topic, + String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + logger.info("MessageClientIDSetter.getNearlyTimeFromID(msgId)={} msgId={}", MessageClientIDSetter.getNearlyTimeFromID(msgId), msgId); + try { + return viewMessage(msgId); + } + catch (Exception e) { + } + MQAdminImpl mqAdminImpl = MQAdminInstance.threadLocalMqClientInstance().getMQAdminImpl(); + QueryResult qr = Reflect.on(mqAdminImpl).call("queryMessage", topic, msgId, 32, + MessageClientIDSetter.getNearlyTimeFromID(msgId).getTime() - 1000 * 60 * 60 * 13L, Long.MAX_VALUE, true).get(); + if (qr != null && qr.getMessageList() != null && qr.getMessageList().size() > 0) { + return qr.getMessageList().get(0); + } + else { + return null; + } + } + + @Override + public ConsumeMessageDirectlyResult consumeMessageDirectly(String consumerGroup, String clientId, String topic, + String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { + return MQAdminInstance.threadLocalMQAdminExt().consumeMessageDirectly(consumerGroup, clientId, topic, msgId); + } + + @Override + public Properties getBrokerConfig( + String brokerAddr) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, UnsupportedEncodingException, InterruptedException, MQBrokerException { + return MQAdminInstance.threadLocalMQAdminExt().getBrokerConfig(brokerAddr); + } + + @Override + public TopicList fetchTopicsByCLuster( + String clusterName) throws RemotingException, MQClientException, InterruptedException { + return MQAdminInstance.threadLocalMQAdminExt().fetchTopicsByCLuster(clusterName); + } + + @Override + public boolean cleanUnusedTopic( + String cluster) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException { + return MQAdminInstance.threadLocalMQAdminExt().cleanUnusedTopic(cluster); + } + + @Override + public boolean cleanUnusedTopicByAddr( + String addr) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException { + return MQAdminInstance.threadLocalMQAdminExt().cleanUnusedTopicByAddr(addr); + } + + @Override + public BrokerStatsData viewBrokerStatsData(String brokerAddr, String statsName, + String statsKey) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException { + return MQAdminInstance.threadLocalMQAdminExt().viewBrokerStatsData(brokerAddr, statsName, statsKey); + } + + @Override + public Set<String> getClusterList( + String topic) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException { + return MQAdminInstance.threadLocalMQAdminExt().getClusterList(topic); + } + + @Override + public ConsumeStatsList fetchConsumeStatsInBroker(String brokerAddr, boolean isOrder, + long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException { + return MQAdminInstance.threadLocalMQAdminExt().fetchConsumeStatsInBroker(brokerAddr, isOrder, timeoutMillis); + } + + @Override + public Set<String> getTopicClusterList( + String topic) throws InterruptedException, MQBrokerException, MQClientException, RemotingException { + return MQAdminInstance.threadLocalMQAdminExt().getTopicClusterList(topic); + } + + @Override + public SubscriptionGroupWrapper getAllSubscriptionGroup(String brokerAddr, + long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { + return MQAdminInstance.threadLocalMQAdminExt().getAllSubscriptionGroup(brokerAddr, timeoutMillis); + } + + @Override + public TopicConfigSerializeWrapper getAllTopicGroup(String brokerAddr, + long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { + return MQAdminInstance.threadLocalMQAdminExt().getAllTopicGroup(brokerAddr, timeoutMillis); + } + + @Override + public void updateConsumeOffset(String brokerAddr, String consumeGroup, MessageQueue mq, + long offset) throws RemotingException, InterruptedException, MQBrokerException { + MQAdminInstance.threadLocalMQAdminExt().updateConsumeOffset(brokerAddr, consumeGroup, mq, offset); + } + + // 4.0.0 added + @Override public void updateNameServerConfig(Properties properties, + List<String> list) throws InterruptedException, RemotingConnectException, UnsupportedEncodingException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, MQBrokerException { + + } + + @Override public Map<String, Properties> getNameServerConfig( + List<String> list) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException, UnsupportedEncodingException { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/4b8f1357/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/client/MQAdminInstance.java ---------------------------------------------------------------------- diff --git a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/client/MQAdminInstance.java b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/client/MQAdminInstance.java new file mode 100644 index 0000000..e914e6c --- /dev/null +++ b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/client/MQAdminInstance.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.console.service.client; + +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.remoting.RemotingClient; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl; +import org.apache.rocketmq.tools.admin.MQAdminExt; +import org.joor.Reflect; + +public class MQAdminInstance { + private static final ThreadLocal<DefaultMQAdminExt> MQ_ADMIN_EXT_THREAD_LOCAL = new ThreadLocal<DefaultMQAdminExt>(); + private static final ThreadLocal<Integer> INIT_COUNTER = new ThreadLocal<Integer>(); + + public static MQAdminExt threadLocalMQAdminExt() { + DefaultMQAdminExt defaultMQAdminExt = MQ_ADMIN_EXT_THREAD_LOCAL.get(); + if (defaultMQAdminExt == null) { + throw new IllegalStateException("defaultMQAdminExt should be init before you get this"); + } + return defaultMQAdminExt; + } + + public static RemotingClient threadLocalRemotingClient() { + MQClientInstance mqClientInstance = threadLocalMqClientInstance(); + MQClientAPIImpl mQClientAPIImpl = Reflect.on(mqClientInstance).get("mQClientAPIImpl"); + return Reflect.on(mQClientAPIImpl).get("remotingClient"); + } + + public static MQClientInstance threadLocalMqClientInstance() { + DefaultMQAdminExtImpl defaultMQAdminExtImpl = Reflect.on(MQAdminInstance.threadLocalMQAdminExt()).get("defaultMQAdminExtImpl"); + return Reflect.on(defaultMQAdminExtImpl).get("mqClientInstance"); + } + + public static void initMQAdminInstance(long timeoutMillis) throws MQClientException { + Integer nowCount = INIT_COUNTER.get(); + if (nowCount == null) { + DefaultMQAdminExt defaultMQAdminExt; + if (timeoutMillis > 0) { + defaultMQAdminExt = new DefaultMQAdminExt(timeoutMillis); + } + else { + defaultMQAdminExt = new DefaultMQAdminExt(); + } + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + defaultMQAdminExt.start(); + MQ_ADMIN_EXT_THREAD_LOCAL.set(defaultMQAdminExt); + INIT_COUNTER.set(1); + } + else { + INIT_COUNTER.set(nowCount + 1); + } + + } + + public static void destroyMQAdminInstance() { + Integer nowCount = INIT_COUNTER.get() - 1; + if (nowCount > 0) { + INIT_COUNTER.set(nowCount); + return; + } + MQAdminExt mqAdminExt = MQ_ADMIN_EXT_THREAD_LOCAL.get(); + if (mqAdminExt != null) { + mqAdminExt.shutdown(); + MQ_ADMIN_EXT_THREAD_LOCAL.remove(); + INIT_COUNTER.remove(); + } + } +}
