http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/client/MQAdminExtImpl.java ---------------------------------------------------------------------- diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/client/MQAdminExtImpl.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/client/MQAdminExtImpl.java new file mode 100644 index 0000000..0eba3a5 --- /dev/null +++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/client/MQAdminExtImpl.java @@ -0,0 +1,501 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.console.service.client; + +import com.google.common.base.Throwables; +import java.io.UnsupportedEncodingException; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import org.apache.rocketmq.client.QueryResult; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.MQAdminImpl; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.admin.ConsumeStats; +import org.apache.rocketmq.common.admin.RollbackStats; +import org.apache.rocketmq.common.admin.TopicStatsTable; +import org.apache.rocketmq.common.message.MessageClientIDSetter; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.RequestCode; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.body.BrokerStatsData; +import org.apache.rocketmq.common.protocol.body.ClusterInfo; +import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; +import org.apache.rocketmq.common.protocol.body.ConsumeStatsList; +import org.apache.rocketmq.common.protocol.body.ConsumerConnection; +import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.common.protocol.body.GroupList; +import org.apache.rocketmq.common.protocol.body.KVTable; +import org.apache.rocketmq.common.protocol.body.ProducerConnection; +import org.apache.rocketmq.common.protocol.body.QueueTimeSpan; +import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; +import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; +import org.apache.rocketmq.common.protocol.body.TopicList; +import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.console.util.JsonUtil; +import org.apache.rocketmq.remoting.RemotingClient; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.tools.admin.MQAdminExt; +import org.apache.rocketmq.tools.admin.api.MessageTrack; +import org.joor.Reflect; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +import static org.apache.rocketmq.remoting.protocol.RemotingSerializable.decode; + +@Service +public class MQAdminExtImpl implements MQAdminExt { + private Logger logger = LoggerFactory.getLogger(MQAdminExtImpl.class); + + public MQAdminExtImpl() { + } + + @Override + public void updateBrokerConfig(String brokerAddr, Properties properties) + throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, + UnsupportedEncodingException, InterruptedException, MQBrokerException { + MQAdminInstance.threadLocalMQAdminExt().updateBrokerConfig(brokerAddr, properties); + } + + @Override + public void createAndUpdateTopicConfig(String addr, TopicConfig config) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + MQAdminInstance.threadLocalMQAdminExt().createAndUpdateTopicConfig(addr, config); + } + + @Override + public void createAndUpdateSubscriptionGroupConfig(String addr, SubscriptionGroupConfig config) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + MQAdminInstance.threadLocalMQAdminExt().createAndUpdateSubscriptionGroupConfig(addr, config); + } + + @Override + public SubscriptionGroupConfig examineSubscriptionGroupConfig(String addr, String group) { + RemotingClient remotingClient = MQAdminInstance.threadLocalRemotingClient(); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null); + RemotingCommand response = null; + try { + response = remotingClient.invokeSync(addr, request, 3000); + } + catch (Exception err) { + throw Throwables.propagate(err); + } + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + SubscriptionGroupWrapper subscriptionGroupWrapper = decode(response.getBody(), SubscriptionGroupWrapper.class); + return subscriptionGroupWrapper.getSubscriptionGroupTable().get(group); + } + default: + throw Throwables.propagate(new MQBrokerException(response.getCode(), response.getRemark())); + } + } + + @Override + public TopicConfig examineTopicConfig(String addr, String topic) { + RemotingClient remotingClient = MQAdminInstance.threadLocalRemotingClient(); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null); + RemotingCommand response = null; + try { + response = remotingClient.invokeSync(addr, request, 3000); + } + catch (Exception err) { + throw Throwables.propagate(err); + } + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + TopicConfigSerializeWrapper topicConfigSerializeWrapper = decode(response.getBody(), TopicConfigSerializeWrapper.class); + return topicConfigSerializeWrapper.getTopicConfigTable().get(topic); + } + default: + throw Throwables.propagate(new MQBrokerException(response.getCode(), response.getRemark())); + } + } + + @Override + public TopicStatsTable examineTopicStats(String topic) + throws RemotingException, MQClientException, InterruptedException, MQBrokerException { + return MQAdminInstance.threadLocalMQAdminExt().examineTopicStats(topic); + } + + @Override + public TopicList fetchAllTopicList() throws RemotingException, MQClientException, InterruptedException { + TopicList topicList = MQAdminInstance.threadLocalMQAdminExt().fetchAllTopicList(); + logger.debug("op=look={}", JsonUtil.obj2String(topicList.getTopicList())); + return topicList; + } + + @Override + public KVTable fetchBrokerRuntimeStats(String brokerAddr) + throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, + InterruptedException, MQBrokerException { + return MQAdminInstance.threadLocalMQAdminExt().fetchBrokerRuntimeStats(brokerAddr); + } + + @Override + public ConsumeStats examineConsumeStats(String consumerGroup) + throws RemotingException, MQClientException, InterruptedException, MQBrokerException { + return MQAdminInstance.threadLocalMQAdminExt().examineConsumeStats(consumerGroup); + } + + @Override + public ConsumeStats examineConsumeStats(String consumerGroup, String topic) + throws RemotingException, MQClientException, InterruptedException, MQBrokerException { + return MQAdminInstance.threadLocalMQAdminExt().examineConsumeStats(consumerGroup, topic); + } + + @Override + public ClusterInfo examineBrokerClusterInfo() + throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, + RemotingConnectException { + return MQAdminInstance.threadLocalMQAdminExt().examineBrokerClusterInfo(); + } + + @Override + public TopicRouteData examineTopicRouteInfo(String topic) + throws RemotingException, MQClientException, InterruptedException { + return MQAdminInstance.threadLocalMQAdminExt().examineTopicRouteInfo(topic); + } + + @Override + public ConsumerConnection examineConsumerConnectionInfo(String consumerGroup) + throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, + InterruptedException, MQBrokerException, RemotingException, MQClientException { + return MQAdminInstance.threadLocalMQAdminExt().examineConsumerConnectionInfo(consumerGroup); + } + + @Override + public ProducerConnection examineProducerConnectionInfo(String producerGroup, String topic) + throws RemotingException, MQClientException, InterruptedException, MQBrokerException { + return MQAdminInstance.threadLocalMQAdminExt().examineProducerConnectionInfo(producerGroup, topic); + } + + @Override + public List<String> getNameServerAddressList() { + return MQAdminInstance.threadLocalMQAdminExt().getNameServerAddressList(); + } + + @Override + public int wipeWritePermOfBroker(String namesrvAddr, String brokerName) + throws RemotingCommandException, RemotingConnectException, RemotingSendRequestException, + RemotingTimeoutException, InterruptedException, MQClientException { + return MQAdminInstance.threadLocalMQAdminExt().wipeWritePermOfBroker(namesrvAddr, brokerName); + } + + @Override + public void putKVConfig(String namespace, String key, String value) { + MQAdminInstance.threadLocalMQAdminExt().putKVConfig(namespace, key, value); + } + + @Override + public String getKVConfig(String namespace, String key) + throws RemotingException, MQClientException, InterruptedException { + return MQAdminInstance.threadLocalMQAdminExt().getKVConfig(namespace, key); + } + + @Override + public KVTable getKVListByNamespace(String namespace) + throws RemotingException, MQClientException, InterruptedException { + return MQAdminInstance.threadLocalMQAdminExt().getKVListByNamespace(namespace); + } + + @Override + public void deleteTopicInBroker(Set<String> addrs, String topic) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + logger.info("addrs={} topic={}", JsonUtil.obj2String(addrs), topic); + MQAdminInstance.threadLocalMQAdminExt().deleteTopicInBroker(addrs, topic); + } + + @Override + public void deleteTopicInNameServer(Set<String> addrs, String topic) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + MQAdminInstance.threadLocalMQAdminExt().deleteTopicInNameServer(addrs, topic); + } + + @Override + public void deleteSubscriptionGroup(String addr, String groupName) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + MQAdminInstance.threadLocalMQAdminExt().deleteSubscriptionGroup(addr, groupName); + } + + @Override + public void createAndUpdateKvConfig(String namespace, String key, String value) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + MQAdminInstance.threadLocalMQAdminExt().createAndUpdateKvConfig(namespace, key, value); + } + + @Override + public void deleteKvConfig(String namespace, String key) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + MQAdminInstance.threadLocalMQAdminExt().deleteKvConfig(namespace, key); + } + + @Override + public List<RollbackStats> resetOffsetByTimestampOld(String consumerGroup, String topic, long timestamp, + boolean force) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + return MQAdminInstance.threadLocalMQAdminExt().resetOffsetByTimestampOld(consumerGroup, topic, timestamp, force); + } + + @Override + public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp, + boolean isForce) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + return MQAdminInstance.threadLocalMQAdminExt().resetOffsetByTimestamp(topic, group, timestamp, isForce); + } + + @Override + public void resetOffsetNew(String consumerGroup, String topic, long timestamp) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + MQAdminInstance.threadLocalMQAdminExt().resetOffsetNew(consumerGroup, topic, timestamp); + } + + @Override + public Map<String, Map<MessageQueue, Long>> getConsumeStatus(String topic, String group, + String clientAddr) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + return MQAdminInstance.threadLocalMQAdminExt().getConsumeStatus(topic, group, clientAddr); + } + + @Override + public void createOrUpdateOrderConf(String key, String value, boolean isCluster) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + MQAdminInstance.threadLocalMQAdminExt().createOrUpdateOrderConf(key, value, isCluster); + } + + @Override + public GroupList queryTopicConsumeByWho(String topic) + throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, + InterruptedException, MQBrokerException, RemotingException, MQClientException { + return MQAdminInstance.threadLocalMQAdminExt().queryTopicConsumeByWho(topic); + } + + @Override + public boolean cleanExpiredConsumerQueue(String cluster) + throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, + InterruptedException { + return MQAdminInstance.threadLocalMQAdminExt().cleanExpiredConsumerQueue(cluster); + } + + @Override + public boolean cleanExpiredConsumerQueueByAddr(String addr) + throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, + InterruptedException { + return MQAdminInstance.threadLocalMQAdminExt().cleanExpiredConsumerQueueByAddr(addr); + } + + @Override + public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, boolean jstack) + throws RemotingException, MQClientException, InterruptedException { + return MQAdminInstance.threadLocalMQAdminExt().getConsumerRunningInfo(consumerGroup, clientId, jstack); + } + + @Override + public ConsumeMessageDirectlyResult consumeMessageDirectly(String consumerGroup, String clientId, + String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { + return MQAdminInstance.threadLocalMQAdminExt().consumeMessageDirectly(consumerGroup, clientId, msgId); + } + + @Override + public List<MessageTrack> messageTrackDetail(MessageExt msg) + throws RemotingException, MQClientException, InterruptedException, MQBrokerException { + return MQAdminInstance.threadLocalMQAdminExt().messageTrackDetail(msg); + } + + @Override + public void cloneGroupOffset(String srcGroup, String destGroup, String topic, boolean isOffline) + throws RemotingException, MQClientException, InterruptedException, MQBrokerException { + MQAdminInstance.threadLocalMQAdminExt().cloneGroupOffset(srcGroup, destGroup, topic, isOffline); + } + + @Override + public void createTopic(String key, String newTopic, int queueNum) throws MQClientException { + MQAdminInstance.threadLocalMQAdminExt().createTopic(key, newTopic, queueNum); + } + + @Override + public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) + throws MQClientException { + MQAdminInstance.threadLocalMQAdminExt().createTopic(key, newTopic, queueNum, topicSysFlag); + } + + @Override + public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { + return MQAdminInstance.threadLocalMQAdminExt().searchOffset(mq, timestamp); + } + + @Override + public long maxOffset(MessageQueue mq) throws MQClientException { + return MQAdminInstance.threadLocalMQAdminExt().maxOffset(mq); + } + + @Override + public long minOffset(MessageQueue mq) throws MQClientException { + return MQAdminInstance.threadLocalMQAdminExt().minOffset(mq); + } + + @Override + public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException { + return MQAdminInstance.threadLocalMQAdminExt().earliestMsgStoreTime(mq); + } + + @Override + public MessageExt viewMessage(String msgId) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + return MQAdminInstance.threadLocalMQAdminExt().viewMessage(msgId); + } + + @Override + public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) + throws MQClientException, InterruptedException { + return MQAdminInstance.threadLocalMQAdminExt().queryMessage(topic, key, maxNum, begin, end); + } + + @Override + @Deprecated + public void start() throws MQClientException { + throw new IllegalStateException("thisMethod is deprecated.use org.apache.rocketmq.console.aspect.admin.MQAdminAspect instead of this"); + } + + @Override + @Deprecated + public void shutdown() { + throw new IllegalStateException("thisMethod is deprecated.use org.apache.rocketmq.console.aspect.admin.MQAdminAspect instead of this"); + } + + // below is 3.2.6->3.5.8 updated + + @Override + public List<QueueTimeSpan> queryConsumeTimeSpan(String topic, + String group) throws InterruptedException, MQBrokerException, RemotingException, MQClientException { + return MQAdminInstance.threadLocalMQAdminExt().queryConsumeTimeSpan(topic, group); + } + + //MessageClientIDSetter.getNearlyTimeFromID has bug,so we subtract half a day + //next version we will remove it + //https://issues.apache.org/jira/browse/ROCKETMQ-111 + //https://github.com/apache/incubator-rocketmq/pull/69 + @Override + public MessageExt viewMessage(String topic, + String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + logger.info("MessageClientIDSetter.getNearlyTimeFromID(msgId)={} msgId={}", MessageClientIDSetter.getNearlyTimeFromID(msgId), msgId); + try { + return viewMessage(msgId); + } + catch (Exception e) { + } + MQAdminImpl mqAdminImpl = MQAdminInstance.threadLocalMqClientInstance().getMQAdminImpl(); + QueryResult qr = Reflect.on(mqAdminImpl).call("queryMessage", topic, msgId, 32, + MessageClientIDSetter.getNearlyTimeFromID(msgId).getTime() - 1000 * 60 * 60 * 13L, Long.MAX_VALUE, true).get(); + if (qr != null && qr.getMessageList() != null && qr.getMessageList().size() > 0) { + return qr.getMessageList().get(0); + } + else { + return null; + } + } + + @Override + public ConsumeMessageDirectlyResult consumeMessageDirectly(String consumerGroup, String clientId, String topic, + String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { + return MQAdminInstance.threadLocalMQAdminExt().consumeMessageDirectly(consumerGroup, clientId, topic, msgId); + } + + @Override + public Properties getBrokerConfig( + String brokerAddr) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, UnsupportedEncodingException, InterruptedException, MQBrokerException { + return MQAdminInstance.threadLocalMQAdminExt().getBrokerConfig(brokerAddr); + } + + @Override + public TopicList fetchTopicsByCLuster( + String clusterName) throws RemotingException, MQClientException, InterruptedException { + return MQAdminInstance.threadLocalMQAdminExt().fetchTopicsByCLuster(clusterName); + } + + @Override + public boolean cleanUnusedTopic( + String cluster) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException { + return MQAdminInstance.threadLocalMQAdminExt().cleanUnusedTopic(cluster); + } + + @Override + public boolean cleanUnusedTopicByAddr( + String addr) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException { + return MQAdminInstance.threadLocalMQAdminExt().cleanUnusedTopicByAddr(addr); + } + + @Override + public BrokerStatsData viewBrokerStatsData(String brokerAddr, String statsName, + String statsKey) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException { + return MQAdminInstance.threadLocalMQAdminExt().viewBrokerStatsData(brokerAddr, statsName, statsKey); + } + + @Override + public Set<String> getClusterList( + String topic) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException { + return MQAdminInstance.threadLocalMQAdminExt().getClusterList(topic); + } + + @Override + public ConsumeStatsList fetchConsumeStatsInBroker(String brokerAddr, boolean isOrder, + long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException { + return MQAdminInstance.threadLocalMQAdminExt().fetchConsumeStatsInBroker(brokerAddr, isOrder, timeoutMillis); + } + + @Override + public Set<String> getTopicClusterList( + String topic) throws InterruptedException, MQBrokerException, MQClientException, RemotingException { + return MQAdminInstance.threadLocalMQAdminExt().getTopicClusterList(topic); + } + + @Override + public SubscriptionGroupWrapper getAllSubscriptionGroup(String brokerAddr, + long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { + return MQAdminInstance.threadLocalMQAdminExt().getAllSubscriptionGroup(brokerAddr, timeoutMillis); + } + + @Override + public TopicConfigSerializeWrapper getAllTopicGroup(String brokerAddr, + long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { + return MQAdminInstance.threadLocalMQAdminExt().getAllTopicGroup(brokerAddr, timeoutMillis); + } + + @Override + public void updateConsumeOffset(String brokerAddr, String consumeGroup, MessageQueue mq, + long offset) throws RemotingException, InterruptedException, MQBrokerException { + MQAdminInstance.threadLocalMQAdminExt().updateConsumeOffset(brokerAddr, consumeGroup, mq, offset); + } + + // 4.0.0 added + @Override public void updateNameServerConfig(Properties properties, + List<String> list) throws InterruptedException, RemotingConnectException, UnsupportedEncodingException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, MQBrokerException { + + } + + @Override public Map<String, Properties> getNameServerConfig( + List<String> list) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException, UnsupportedEncodingException { + return null; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/client/MQAdminInstance.java ---------------------------------------------------------------------- diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/client/MQAdminInstance.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/client/MQAdminInstance.java new file mode 100644 index 0000000..e914e6c --- /dev/null +++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/client/MQAdminInstance.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.console.service.client; + +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.remoting.RemotingClient; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl; +import org.apache.rocketmq.tools.admin.MQAdminExt; +import org.joor.Reflect; + +public class MQAdminInstance { + private static final ThreadLocal<DefaultMQAdminExt> MQ_ADMIN_EXT_THREAD_LOCAL = new ThreadLocal<DefaultMQAdminExt>(); + private static final ThreadLocal<Integer> INIT_COUNTER = new ThreadLocal<Integer>(); + + public static MQAdminExt threadLocalMQAdminExt() { + DefaultMQAdminExt defaultMQAdminExt = MQ_ADMIN_EXT_THREAD_LOCAL.get(); + if (defaultMQAdminExt == null) { + throw new IllegalStateException("defaultMQAdminExt should be init before you get this"); + } + return defaultMQAdminExt; + } + + public static RemotingClient threadLocalRemotingClient() { + MQClientInstance mqClientInstance = threadLocalMqClientInstance(); + MQClientAPIImpl mQClientAPIImpl = Reflect.on(mqClientInstance).get("mQClientAPIImpl"); + return Reflect.on(mQClientAPIImpl).get("remotingClient"); + } + + public static MQClientInstance threadLocalMqClientInstance() { + DefaultMQAdminExtImpl defaultMQAdminExtImpl = Reflect.on(MQAdminInstance.threadLocalMQAdminExt()).get("defaultMQAdminExtImpl"); + return Reflect.on(defaultMQAdminExtImpl).get("mqClientInstance"); + } + + public static void initMQAdminInstance(long timeoutMillis) throws MQClientException { + Integer nowCount = INIT_COUNTER.get(); + if (nowCount == null) { + DefaultMQAdminExt defaultMQAdminExt; + if (timeoutMillis > 0) { + defaultMQAdminExt = new DefaultMQAdminExt(timeoutMillis); + } + else { + defaultMQAdminExt = new DefaultMQAdminExt(); + } + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + defaultMQAdminExt.start(); + MQ_ADMIN_EXT_THREAD_LOCAL.set(defaultMQAdminExt); + INIT_COUNTER.set(1); + } + else { + INIT_COUNTER.set(nowCount + 1); + } + + } + + public static void destroyMQAdminInstance() { + Integer nowCount = INIT_COUNTER.get() - 1; + if (nowCount > 0) { + INIT_COUNTER.set(nowCount); + return; + } + MQAdminExt mqAdminExt = MQ_ADMIN_EXT_THREAD_LOCAL.get(); + if (mqAdminExt != null) { + mqAdminExt.shutdown(); + MQ_ADMIN_EXT_THREAD_LOCAL.remove(); + INIT_COUNTER.remove(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/ClusterServiceImpl.java ---------------------------------------------------------------------- diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/ClusterServiceImpl.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/ClusterServiceImpl.java new file mode 100644 index 0000000..e225edc --- /dev/null +++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/ClusterServiceImpl.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.console.service.impl; + +import org.apache.rocketmq.common.protocol.body.ClusterInfo; +import org.apache.rocketmq.common.protocol.body.KVTable; +import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.tools.admin.MQAdminExt; +import org.apache.rocketmq.console.service.ClusterService; +import org.apache.rocketmq.console.util.JsonUtil; +import com.google.common.base.Throwables; +import com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.util.Map; +import java.util.Properties; + +@Service +public class ClusterServiceImpl implements ClusterService { + private Logger logger = LoggerFactory.getLogger(ClusterServiceImpl.class); + @Resource + private MQAdminExt mqAdminExt; + + @Override + public Map<String, Object> list() { + try { + Map<String, Object> resultMap = Maps.newHashMap(); + ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo(); + logger.info("op=look_clusterInfo {}", JsonUtil.obj2String(clusterInfo)); + Map<String/*brokerName*/, Map<Long/* brokerId */, Object/* brokerDetail */>> brokerServer = Maps.newHashMap(); + for (BrokerData brokerData : clusterInfo.getBrokerAddrTable().values()) { + Map<Long, Object> brokerMasterSlaveMap = Maps.newHashMap(); + for (Map.Entry<Long/* brokerId */, String/* broker address */> brokerAddr : brokerData.getBrokerAddrs().entrySet()) { + KVTable kvTable = mqAdminExt.fetchBrokerRuntimeStats(brokerAddr.getValue()); +// KVTable kvTable = mqAdminExt.fetchBrokerRuntimeStats("127.0.0.1:10911"); + brokerMasterSlaveMap.put(brokerAddr.getKey(), kvTable.getTable()); + } + brokerServer.put(brokerData.getBrokerName(), brokerMasterSlaveMap); + } + resultMap.put("clusterInfo", clusterInfo); + resultMap.put("brokerServer", brokerServer); + return resultMap; + } + catch (Exception err) { + throw Throwables.propagate(err); + } + } + + + @Override + public Properties getBrokerConfig(String brokerAddr) { + try { + return mqAdminExt.getBrokerConfig(brokerAddr); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/ConsumerServiceImpl.java ---------------------------------------------------------------------- diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/ConsumerServiceImpl.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/ConsumerServiceImpl.java new file mode 100644 index 0000000..715cbf5 --- /dev/null +++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/ConsumerServiceImpl.java @@ -0,0 +1,341 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.console.service.impl; + +import com.google.common.base.Predicate; +import com.google.common.base.Throwables; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.MQVersion; +import org.apache.rocketmq.common.admin.ConsumeStats; +import org.apache.rocketmq.common.admin.RollbackStats; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.body.ClusterInfo; +import org.apache.rocketmq.common.protocol.body.Connection; +import org.apache.rocketmq.common.protocol.body.ConsumerConnection; +import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.common.protocol.body.GroupList; +import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; +import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.console.aspect.admin.annotation.MultiMQAdminCmdMethod; +import org.apache.rocketmq.console.model.ConsumerGroupRollBackStat; +import org.apache.rocketmq.console.model.GroupConsumeInfo; +import org.apache.rocketmq.console.model.QueueStatInfo; +import org.apache.rocketmq.console.model.TopicConsumerInfo; +import org.apache.rocketmq.console.model.request.ConsumerConfigInfo; +import org.apache.rocketmq.console.model.request.DeleteSubGroupRequest; +import org.apache.rocketmq.console.model.request.ResetOffsetRequest; +import org.apache.rocketmq.console.service.AbstractCommonService; +import org.apache.rocketmq.console.service.ConsumerService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +import static com.google.common.base.Throwables.propagate; + +@Service +public class ConsumerServiceImpl extends AbstractCommonService implements ConsumerService { + private Logger logger = LoggerFactory.getLogger(ConsumerServiceImpl.class); + + @Override + @MultiMQAdminCmdMethod + public List<GroupConsumeInfo> queryGroupList() { + Set<String> consumerGroupSet = Sets.newHashSet(); + try { + ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo(); + for (BrokerData brokerData : clusterInfo.getBrokerAddrTable().values()) { + SubscriptionGroupWrapper subscriptionGroupWrapper = mqAdminExt.getAllSubscriptionGroup(brokerData.selectBrokerAddr(), 3000L); + consumerGroupSet.addAll(subscriptionGroupWrapper.getSubscriptionGroupTable().keySet()); + } + } + catch (Exception err) { + throw Throwables.propagate(err); + } + List<GroupConsumeInfo> groupConsumeInfoList = Lists.newArrayList(); + for (String consumerGroup : consumerGroupSet) { + groupConsumeInfoList.add(queryGroup(consumerGroup)); + } + Collections.sort(groupConsumeInfoList); + return groupConsumeInfoList; + } + + @Override + @MultiMQAdminCmdMethod + public GroupConsumeInfo queryGroup(String consumerGroup) { + GroupConsumeInfo groupConsumeInfo = new GroupConsumeInfo(); + try { + ConsumeStats consumeStats = null; + try { + consumeStats = mqAdminExt.examineConsumeStats(consumerGroup); + } + catch (Exception e) { + logger.warn("examineConsumeStats exception, " + consumerGroup, e); + } + + ConsumerConnection consumerConnection = null; + try { + consumerConnection = mqAdminExt.examineConsumerConnectionInfo(consumerGroup); + } + catch (Exception e) { + logger.warn("examineConsumerConnectionInfo exception, " + consumerGroup, e); + } + + groupConsumeInfo.setGroup(consumerGroup); + + if (consumeStats != null) { + groupConsumeInfo.setConsumeTps((int)consumeStats.getConsumeTps()); + groupConsumeInfo.setDiffTotal(consumeStats.computeTotalDiff()); + } + + if (consumerConnection != null) { + groupConsumeInfo.setCount(consumerConnection.getConnectionSet().size()); + groupConsumeInfo.setMessageModel(consumerConnection.getMessageModel()); + groupConsumeInfo.setConsumeType(consumerConnection.getConsumeType()); + groupConsumeInfo.setVersion(MQVersion.getVersionDesc(consumerConnection.computeMinVersion())); + } + } + catch (Exception e) { + logger.warn("examineConsumeStats or examineConsumerConnectionInfo exception, " + + consumerGroup, e); + } + return groupConsumeInfo; + } + + @Override + public List<TopicConsumerInfo> queryConsumeStatsListByGroupName(String groupName) { + return queryConsumeStatsList(null, groupName); + } + + @Override + @MultiMQAdminCmdMethod + public List<TopicConsumerInfo> queryConsumeStatsList(final String topic, String groupName) { + ConsumeStats consumeStats = null; + try { + consumeStats = mqAdminExt.examineConsumeStats(groupName, topic); + } + catch (Exception e) { + throw propagate(e); + } + List<MessageQueue> mqList = Lists.newArrayList(Iterables.filter(consumeStats.getOffsetTable().keySet(), new Predicate<MessageQueue>() { + @Override + public boolean apply(MessageQueue o) { + return StringUtils.isBlank(topic) || o.getTopic().equals(topic); + } + })); + Collections.sort(mqList); + List<TopicConsumerInfo> topicConsumerInfoList = Lists.newArrayList(); + TopicConsumerInfo nowTopicConsumerInfo = null; + Map<MessageQueue, String> messageQueueClientMap = getClientConnection(groupName); + for (MessageQueue mq : mqList) { + if (nowTopicConsumerInfo == null || (!StringUtils.equals(mq.getTopic(), nowTopicConsumerInfo.getTopic()))) { + nowTopicConsumerInfo = new TopicConsumerInfo(mq.getTopic()); + topicConsumerInfoList.add(nowTopicConsumerInfo); + } + QueueStatInfo queueStatInfo = QueueStatInfo.fromOffsetTableEntry(mq, consumeStats.getOffsetTable().get(mq)); + queueStatInfo.setClientInfo(messageQueueClientMap.get(mq)); + nowTopicConsumerInfo.appendQueueStatInfo(queueStatInfo); + } + return topicConsumerInfoList; + } + + private Map<MessageQueue, String> getClientConnection(String groupName) { + Map<MessageQueue, String> results = Maps.newHashMap(); + try { + ConsumerConnection consumerConnection = mqAdminExt.examineConsumerConnectionInfo(groupName); + for (Connection connection : consumerConnection.getConnectionSet()) { + String clinetId = connection.getClientId(); + ConsumerRunningInfo consumerRunningInfo = mqAdminExt.getConsumerRunningInfo(groupName, clinetId, false); + for (MessageQueue messageQueue : consumerRunningInfo.getMqTable().keySet()) { +// results.put(messageQueue, clinetId + " " + connection.getClientAddr()); + results.put(messageQueue, clinetId); + } + } + } + catch (Exception err) { + logger.error("op=getClientConnection_error", err); + } + return results; + } + + @Override + @MultiMQAdminCmdMethod + public Map<String /*groupName*/, TopicConsumerInfo> queryConsumeStatsListByTopicName(String topic) { + Map<String, TopicConsumerInfo> group2ConsumerInfoMap = Maps.newHashMap(); + try { + GroupList groupList = mqAdminExt.queryTopicConsumeByWho(topic); + for (String group : groupList.getGroupList()) { + List<TopicConsumerInfo> topicConsumerInfoList = null; + try { + topicConsumerInfoList = queryConsumeStatsList(topic, group); + } + catch (Exception ignore) { + } + group2ConsumerInfoMap.put(group, CollectionUtils.isEmpty(topicConsumerInfoList) ? new TopicConsumerInfo(topic) : topicConsumerInfoList.get(0)); + } + return group2ConsumerInfoMap; + } + catch (Exception e) { + throw propagate(e); + } + } + + @Override + @MultiMQAdminCmdMethod + public Map<String, ConsumerGroupRollBackStat> resetOffset(ResetOffsetRequest resetOffsetRequest) { + Map<String, ConsumerGroupRollBackStat> groupRollbackStats = Maps.newHashMap(); + for (String consumerGroup : resetOffsetRequest.getConsumerGroupList()) { + try { + Map<MessageQueue, Long> rollbackStatsMap = + mqAdminExt.resetOffsetByTimestamp(resetOffsetRequest.getTopic(), consumerGroup, resetOffsetRequest.getResetTime(), resetOffsetRequest.isForce()); + ConsumerGroupRollBackStat consumerGroupRollBackStat = new ConsumerGroupRollBackStat(true); + List<RollbackStats> rollbackStatsList = consumerGroupRollBackStat.getRollbackStatsList(); + for (Map.Entry<MessageQueue, Long> rollbackStatsEntty : rollbackStatsMap.entrySet()) { + RollbackStats rollbackStats = new RollbackStats(); + rollbackStats.setRollbackOffset(rollbackStatsEntty.getValue()); + rollbackStats.setQueueId(rollbackStatsEntty.getKey().getQueueId()); + rollbackStats.setBrokerName(rollbackStatsEntty.getKey().getBrokerName()); + rollbackStatsList.add(rollbackStats); + } + groupRollbackStats.put(consumerGroup, consumerGroupRollBackStat); + } + catch (MQClientException e) { + if (ResponseCode.CONSUMER_NOT_ONLINE == e.getResponseCode()) { + try { + ConsumerGroupRollBackStat consumerGroupRollBackStat = new ConsumerGroupRollBackStat(true); + List<RollbackStats> rollbackStatsList = mqAdminExt.resetOffsetByTimestampOld(consumerGroup, resetOffsetRequest.getTopic(), resetOffsetRequest.getResetTime(), true); + consumerGroupRollBackStat.setRollbackStatsList(rollbackStatsList); + groupRollbackStats.put(consumerGroup, consumerGroupRollBackStat); + continue; + } + catch (Exception err) { + logger.error("op=resetOffset_which_not_online_error", err); + } + } + else { + logger.error("op=resetOffset_error", e); + } + groupRollbackStats.put(consumerGroup, new ConsumerGroupRollBackStat(false, e.getMessage())); + } + catch (Exception e) { + logger.error("op=resetOffset_error", e); + groupRollbackStats.put(consumerGroup, new ConsumerGroupRollBackStat(false, e.getMessage())); + } + } + return groupRollbackStats; + } + + @Override + @MultiMQAdminCmdMethod + public List<ConsumerConfigInfo> examineSubscriptionGroupConfig(String group) { + List<ConsumerConfigInfo> consumerConfigInfoList = Lists.newArrayList(); + try { + ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo(); + for (String brokerName : clusterInfo.getBrokerAddrTable().keySet()) { //foreach brokerName + String brokerAddress = clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(); + SubscriptionGroupConfig subscriptionGroupConfig = mqAdminExt.examineSubscriptionGroupConfig(brokerAddress, group); + if (subscriptionGroupConfig == null) { + continue; + } + consumerConfigInfoList.add(new ConsumerConfigInfo(Lists.newArrayList(brokerName), subscriptionGroupConfig)); + } + } + catch (Exception e) { + throw propagate(e); + } + return consumerConfigInfoList; + } + + @Override + @MultiMQAdminCmdMethod + public boolean deleteSubGroup(DeleteSubGroupRequest deleteSubGroupRequest) { + try { + ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo(); + for (String brokerName : deleteSubGroupRequest.getBrokerNameList()) { + logger.info("addr={} groupName={}", clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), deleteSubGroupRequest.getGroupName()); + mqAdminExt.deleteSubscriptionGroup(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), deleteSubGroupRequest.getGroupName()); + } + } + catch (Exception e) { + throw propagate(e); + } + return true; + } + + @Override + public boolean createAndUpdateSubscriptionGroupConfig(ConsumerConfigInfo consumerConfigInfo) { + try { + ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo(); + for (String brokerName : changeToBrokerNameSet(clusterInfo.getClusterAddrTable(), + consumerConfigInfo.getClusterNameList(), consumerConfigInfo.getBrokerNameList())) { + mqAdminExt.createAndUpdateSubscriptionGroupConfig(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), consumerConfigInfo.getSubscriptionGroupConfig()); + } + } + catch (Exception err) { + throw Throwables.propagate(err); + } + return true; + } + + @Override + @MultiMQAdminCmdMethod + public Set<String> fetchBrokerNameSetBySubscriptionGroup(String group) { + Set<String> brokerNameSet = Sets.newHashSet(); + try { + List<ConsumerConfigInfo> consumerConfigInfoList = examineSubscriptionGroupConfig(group); + for (ConsumerConfigInfo consumerConfigInfo : consumerConfigInfoList) { + brokerNameSet.addAll(consumerConfigInfo.getBrokerNameList()); + } + } + catch (Exception e) { + throw Throwables.propagate(e); + } + return brokerNameSet; + + } + + @Override + public ConsumerConnection getConsumerConnection(String consumerGroup) { + try { + return mqAdminExt.examineConsumerConnectionInfo(consumerGroup); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + + @Override + public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, boolean jstack) { + try { + return mqAdminExt.getConsumerRunningInfo(consumerGroup, clientId, jstack); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/DashboardCollectServiceImpl.java ---------------------------------------------------------------------- diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/DashboardCollectServiceImpl.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/DashboardCollectServiceImpl.java new file mode 100644 index 0000000..d32a344 --- /dev/null +++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/DashboardCollectServiceImpl.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.console.service.impl; + +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; +import com.google.common.base.Charsets; +import com.google.common.base.Throwables; +import com.google.common.base.Ticker; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.io.Files; +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; +import javax.annotation.Resource; +import org.apache.rocketmq.console.config.RMQConfigure; +import org.apache.rocketmq.console.exception.ServiceException; +import org.apache.rocketmq.console.service.DashboardCollectService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +@Service +public class DashboardCollectServiceImpl implements DashboardCollectService { + + @Resource + private RMQConfigure rmqConfigure; + + private final static Logger log = LoggerFactory.getLogger(DashboardCollectServiceImpl.class); + + private LoadingCache<String, List<String>> brokerMap = CacheBuilder.newBuilder() + .maximumSize(1000) + .concurrencyLevel(10) + .recordStats() + .ticker(Ticker.systemTicker()) + .removalListener(new RemovalListener<Object, Object>() { + @Override + public void onRemoval(RemovalNotification<Object, Object> notification) { + log.debug(notification.getKey() + " was removed, cause is " + notification.getCause()); + } + }) + .build( + new CacheLoader<String, List<String>>() { + @Override + public List<String> load(String key) { + List<String> list = Lists.newArrayList(); + return list; + } + } + ); + + private LoadingCache<String, List<String>> topicMap = CacheBuilder.newBuilder() + .maximumSize(1000) + .concurrencyLevel(10) + .recordStats() + .ticker(Ticker.systemTicker()) + .removalListener(new RemovalListener<Object, Object>() { + @Override + public void onRemoval(RemovalNotification<Object, Object> notification) { + log.debug(notification.getKey() + " was removed, cause is " + notification.getCause()); + } + }) + .build( + new CacheLoader<String, List<String>>() { + @Override + public List<String> load(String key) { + List<String> list = Lists.newArrayList(); + return list; + } + } + ); + + @Override + public LoadingCache<String, List<String>> getBrokerMap() { + return brokerMap; + } + @Override + public LoadingCache<String, List<String>> getTopicMap() { + return topicMap; + } + + @Override + public Map<String, List<String>> jsonDataFile2map(File file) { + List<String> strings; + try { + strings = Files.readLines(file, Charsets.UTF_8); + } + catch (IOException e) { + throw Throwables.propagate(e); + } + StringBuffer sb = new StringBuffer(); + for (String string : strings) { + sb.append(string); + } + JSONObject json = (JSONObject) JSONObject.parse(sb.toString()); + Set<Map.Entry<String, Object>> entries = json.entrySet(); + Map<String, List<String>> map = Maps.newHashMap(); + for (Map.Entry<String, Object> entry : entries) { + JSONArray tpsArray = (JSONArray) entry.getValue(); + if (tpsArray == null) { + continue; + } + Object[] tpsStrArray = tpsArray.toArray(); + List<String> tpsList = Lists.newArrayList(); + for (Object tpsObj : tpsStrArray) { + tpsList.add("" + tpsObj); + } + map.put(entry.getKey(), tpsList); + } + return map; + } + + @Override + public Map<String, List<String>> getBrokerCache(String date) { + String dataLocationPath = rmqConfigure.getConsoleCollectData(); + File file = new File(dataLocationPath + date + ".json"); + if (!file.exists()) { + throw Throwables.propagate(new ServiceException(1, "This date have't data!")); + } + return jsonDataFile2map(file); + } + + @Override + public Map<String, List<String>> getTopicCache(String date) { + String dataLocationPath = rmqConfigure.getConsoleCollectData(); + File file = new File(dataLocationPath + date + "_topic" + ".json"); + if (!file.exists()) { + throw Throwables.propagate(new ServiceException(1, "This date have't data!")); + } + return jsonDataFile2map(file); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/DashboardServiceImpl.java ---------------------------------------------------------------------- diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/DashboardServiceImpl.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/DashboardServiceImpl.java new file mode 100644 index 0000000..3189093 --- /dev/null +++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/DashboardServiceImpl.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.console.service.impl; + +import com.google.common.collect.Lists; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.List; +import java.util.Map; +import javax.annotation.Resource; +import org.apache.rocketmq.console.service.DashboardCollectService; +import org.apache.rocketmq.console.service.DashboardService; +import org.springframework.stereotype.Service; + +@Service +public class DashboardServiceImpl implements DashboardService { + + @Resource + private DashboardCollectService dashboardCollectService; + /** + * @param date format yyyy-MM-dd + */ + @Override + public Map<String, List<String>> queryBrokerData(String date) { + return dashboardCollectService.getBrokerCache(date); + } + + @Override + public Map<String, List<String>> queryTopicData(String date) { + return dashboardCollectService.getTopicCache(date); + } + + /** + * @param date format yyyy-MM-dd + * @param topicName + */ + @Override + public List<String> queryTopicData(String date, String topicName) { + if (null != dashboardCollectService.getTopicCache(date)) { + return dashboardCollectService.getTopicCache(date).get(topicName); + } + return null; + } + + @Override + public List<String> queryTopicCurrentData() { + Date date = new Date(); + DateFormat format = new SimpleDateFormat("yyyy-MM-dd"); + Map<String, List<String>> topicCache = dashboardCollectService.getTopicCache(format.format(date)); + List<String> result = Lists.newArrayList(); + for (Map.Entry<String, List<String>> entry : topicCache.entrySet()) { + List<String> value = entry.getValue(); + result.add(entry.getKey() + "," + value.get(value.size() - 1).split(",")[4]); + } + return result; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/MessageServiceImpl.java ---------------------------------------------------------------------- diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/MessageServiceImpl.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/MessageServiceImpl.java new file mode 100644 index 0000000..0205a69 --- /dev/null +++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/MessageServiceImpl.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.console.service.impl; + +import com.google.common.base.Function; +import com.google.common.base.Predicate; +import com.google.common.base.Throwables; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.List; +import java.util.Set; +import javax.annotation.Resource; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.client.consumer.PullResult; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.Pair; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.body.Connection; +import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; +import org.apache.rocketmq.common.protocol.body.ConsumerConnection; +import org.apache.rocketmq.console.model.MessageView; +import org.apache.rocketmq.console.service.MessageService; +import org.apache.rocketmq.tools.admin.MQAdminExt; +import org.apache.rocketmq.tools.admin.api.MessageTrack; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +@Service +public class MessageServiceImpl implements MessageService { + + private Logger logger = LoggerFactory.getLogger(MessageServiceImpl.class); + /** + * @see org.apache.rocketmq.store.config.MessageStoreConfig maxMsgsNumBatch = 64; + * @see org.apache.rocketmq.store.index.IndexService maxNum = Math.min(maxNum, this.defaultMessageStore.getMessageStoreConfig().getMaxMsgsNumBatch()); + */ + private final static int QUERY_MESSAGE_MAX_NUM = 64; + @Resource + private MQAdminExt mqAdminExt; + + public Pair<MessageView, List<MessageTrack>> viewMessage(String subject, final String msgId) { + try { + + MessageExt messageExt = mqAdminExt.viewMessage(subject, msgId); + List<MessageTrack> messageTrackList = messageTrackDetail(messageExt); + return new Pair<>(MessageView.fromMessageExt(messageExt), messageTrackList); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + + @Override + public List<MessageView> queryMessageByTopicAndKey(String topic, String key) { + try { + return Lists.transform(mqAdminExt.queryMessage(topic, key, QUERY_MESSAGE_MAX_NUM, 0, System.currentTimeMillis()).getMessageList(), new Function<MessageExt, MessageView>() { + @Override + public MessageView apply(MessageExt messageExt) { + return MessageView.fromMessageExt(messageExt); + } + }); + } + catch (Exception err) { + throw Throwables.propagate(err); + } + } + + @Override + public List<MessageView> queryMessageByTopic(String topic, final long begin, final long end) { + DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, null); + List<MessageView> messageViewList = Lists.newArrayList(); + try { + String subExpression = "*"; + consumer.start(); + Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(topic); + for (MessageQueue mq : mqs) { + long minOffset = consumer.searchOffset(mq, begin); + long maxOffset = consumer.searchOffset(mq, end); + READQ: + for (long offset = minOffset; offset <= maxOffset; ) { + try { + if (messageViewList.size() > 2000) { + break; + } + PullResult pullResult = consumer.pull(mq, subExpression, offset, 32); + offset = pullResult.getNextBeginOffset(); + switch (pullResult.getPullStatus()) { + case FOUND: + + List<MessageView> messageViewListByQuery = Lists.transform(pullResult.getMsgFoundList(), new Function<MessageExt, MessageView>() { + @Override + public MessageView apply(MessageExt messageExt) { + messageExt.setBody(null); + return MessageView.fromMessageExt(messageExt); + } + }); + List<MessageView> filteredList = Lists.newArrayList(Iterables.filter(messageViewListByQuery, new Predicate<MessageView>() { + @Override + public boolean apply(MessageView messageView) { + if (messageView.getStoreTimestamp() < begin || messageView.getStoreTimestamp() > end) { + logger.info("begin={} end={} time not in range {} {}", begin, end, messageView.getStoreTimestamp(), new Date(messageView.getStoreTimestamp()).toString()); + } + return messageView.getStoreTimestamp() >= begin && messageView.getStoreTimestamp() <= end; + } + })); + messageViewList.addAll(filteredList); + break; + case NO_MATCHED_MSG: + case NO_NEW_MSG: + case OFFSET_ILLEGAL: + break READQ; + } + } + catch (Exception e) { + break; + } + } + } + Collections.sort(messageViewList, new Comparator<MessageView>() { + @Override + public int compare(MessageView o1, MessageView o2) { + if (o1.getStoreTimestamp() - o2.getStoreTimestamp() == 0) { + return 0; + } + return (o1.getStoreTimestamp() > o2.getStoreTimestamp()) ? -1 : 1; + } + }); + return messageViewList; + } + catch (Exception e) { + throw Throwables.propagate(e); + } + finally { + consumer.shutdown(); + } + } + + @Override + public List<MessageTrack> messageTrackDetail(MessageExt msg) { + try { + return mqAdminExt.messageTrackDetail(msg); + } + catch (Exception e) { + logger.error("op=messageTrackDetailError", e); + return Collections.emptyList(); + } + } + + + @Override + public ConsumeMessageDirectlyResult consumeMessageDirectly(String topic, String msgId, String consumerGroup, + String clientId) { + if (StringUtils.isNotBlank(clientId)) { + try { + return mqAdminExt.consumeMessageDirectly(consumerGroup, clientId, topic, msgId); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + + try { + ConsumerConnection consumerConnection = mqAdminExt.examineConsumerConnectionInfo(consumerGroup); + for (Connection connection : consumerConnection.getConnectionSet()) { + if (StringUtils.isBlank(connection.getClientId())) { + continue; + } + logger.info("clientId={}", connection.getClientId()); + return mqAdminExt.consumeMessageDirectly(consumerGroup, connection.getClientId(), topic, msgId); + } + } + catch (Exception e) { + throw Throwables.propagate(e); + } + throw new IllegalStateException("NO CONSUMER"); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/MonitorServiceImpl.java ---------------------------------------------------------------------- diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/MonitorServiceImpl.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/MonitorServiceImpl.java new file mode 100644 index 0000000..d3a109d --- /dev/null +++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/MonitorServiceImpl.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.console.service.impl; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.base.Throwables; +import java.io.File; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import javax.annotation.PostConstruct; +import javax.annotation.Resource; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.console.config.RMQConfigure; +import org.apache.rocketmq.console.model.ConsumerMonitorConfig; +import org.apache.rocketmq.console.service.MonitorService; +import org.apache.rocketmq.console.util.JsonUtil; +import org.springframework.stereotype.Service; + +@Service +public class MonitorServiceImpl implements MonitorService { + + + @Resource + private RMQConfigure rmqConfigure; + + private Map<String, ConsumerMonitorConfig> configMap = new ConcurrentHashMap<>(); + + @Override + public boolean createOrUpdateConsumerMonitor(String name, ConsumerMonitorConfig config) { + configMap.put(name, config);// todo if write map success but write file fail + writeToFile(getConsumerMonitorConfigDataPath(), configMap); + return true; + } + + @Override + public Map<String, ConsumerMonitorConfig> queryConsumerMonitorConfig() { + return configMap; + } + + @Override + public ConsumerMonitorConfig queryConsumerMonitorConfigByGroupName(String consumeGroupName) { + return configMap.get(consumeGroupName); + } + + @Override + public boolean deleteConsumerMonitor(String consumeGroupName) { + configMap.remove(consumeGroupName); + writeToFile(getConsumerMonitorConfigDataPath(), configMap); + return true; + } + + //rocketmq.console.data.path/monitor/consumerMonitorConfig.json + private String getConsumerMonitorConfigDataPath() { + return rmqConfigure.getRocketMqConsoleDataPath() + File.separatorChar + "monitor" + File.separatorChar + "consumerMonitorConfig.json"; + } + + private String getConsumerMonitorConfigDataPathBackUp() { + return getConsumerMonitorConfigDataPath() + ".bak"; + } + + private void writeToFile(String path, Object data) { + writeDataJsonToFile(path, JsonUtil.obj2String(data)); + } + + private void writeDataJsonToFile(String path, String dataStr) { + try { + MixAll.string2File(dataStr, path); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + + @PostConstruct + private void loadData() { + String content = MixAll.file2String(getConsumerMonitorConfigDataPath()); + if (content == null) { + content = MixAll.file2String(getConsumerMonitorConfigDataPathBackUp()); + } + if (content == null) { + return; + } + configMap = JsonUtil.string2Obj(content, new TypeReference<ConcurrentHashMap<String, ConsumerMonitorConfig>>() { + }); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/OpsServiceImpl.java ---------------------------------------------------------------------- diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/OpsServiceImpl.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/OpsServiceImpl.java new file mode 100644 index 0000000..84e6d2f --- /dev/null +++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/OpsServiceImpl.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.console.service.impl; + +import com.google.common.base.Splitter; +import com.google.common.collect.Maps; +import java.util.List; +import java.util.Map; +import javax.annotation.Resource; +import org.apache.rocketmq.console.config.RMQConfigure; +import org.apache.rocketmq.console.service.AbstractCommonService; +import org.apache.rocketmq.console.service.OpsService; +import org.apache.rocketmq.console.service.checker.CheckerType; +import org.apache.rocketmq.console.service.checker.RocketMqChecker; +import org.springframework.stereotype.Service; + +@Service +public class OpsServiceImpl extends AbstractCommonService implements OpsService { + + @Resource + private RMQConfigure rMQConfigure; + + @Resource + private List<RocketMqChecker> rocketMqCheckerList; + + @Override + public Map<String, Object> homePageInfo() { + Map<String, Object> homePageInfoMap = Maps.newHashMap(); + homePageInfoMap.put("namesvrAddrList", Splitter.on(";").splitToList(rMQConfigure.getNamesrvAddr())); + homePageInfoMap.put("useVIPChannel", Boolean.valueOf(rMQConfigure.getIsVIPChannel())); + return homePageInfoMap; + } + + @Override + public void updateNameSvrAddrList(String nameSvrAddrList) { + rMQConfigure.setNamesrvAddr(nameSvrAddrList); + } + + @Override + public String getNameSvrList() { + return rMQConfigure.getNamesrvAddr(); + } + + @Override + public Map<CheckerType, Object> rocketMqStatusCheck() { + Map<CheckerType, Object> checkResultMap = Maps.newHashMap(); + for (RocketMqChecker rocketMqChecker : rocketMqCheckerList) { + checkResultMap.put(rocketMqChecker.checkerType(), rocketMqChecker.doCheck()); + } + return checkResultMap; + } + + @Override public boolean updateIsVIPChannel(String useVIPChannel) { + rMQConfigure.setIsVIPChannel(useVIPChannel); + return true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/ProducerServiceImpl.java ---------------------------------------------------------------------- diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/ProducerServiceImpl.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/ProducerServiceImpl.java new file mode 100644 index 0000000..3e46958 --- /dev/null +++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/ProducerServiceImpl.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.console.service.impl; + +import com.google.common.base.Throwables; +import javax.annotation.Resource; +import org.apache.rocketmq.common.protocol.body.ProducerConnection; +import org.apache.rocketmq.console.service.ProducerService; +import org.apache.rocketmq.tools.admin.MQAdminExt; +import org.springframework.stereotype.Service; + +@Service +public class ProducerServiceImpl implements ProducerService { + @Resource + private MQAdminExt mqAdminExt; + + @Override + public ProducerConnection getProducerConnection(String producerGroup, String topic) { + try { + return mqAdminExt.examineProducerConnectionInfo(producerGroup, topic); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/TopicServiceImpl.java ---------------------------------------------------------------------- diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/TopicServiceImpl.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/TopicServiceImpl.java new file mode 100644 index 0000000..117bcfd --- /dev/null +++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/TopicServiceImpl.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.console.service.impl; + +import com.google.common.base.Throwables; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.admin.TopicStatsTable; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.protocol.body.ClusterInfo; +import org.apache.rocketmq.common.protocol.body.GroupList; +import org.apache.rocketmq.common.protocol.body.TopicList; +import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.console.config.RMQConfigure; +import org.apache.rocketmq.console.model.request.SendTopicMessageRequest; +import org.apache.rocketmq.console.model.request.TopicConfigInfo; +import org.apache.rocketmq.console.service.AbstractCommonService; +import org.apache.rocketmq.console.service.TopicService; +import org.apache.rocketmq.tools.command.CommandUtil; +import org.springframework.beans.BeanUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +@Service +public class TopicServiceImpl extends AbstractCommonService implements TopicService { + + @Autowired + private RMQConfigure rMQConfigure; + + @Override + public TopicList fetchAllTopicList() { + try { + return mqAdminExt.fetchAllTopicList(); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + + @Override + public TopicStatsTable stats(String topic) { + try { + return mqAdminExt.examineTopicStats(topic); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + + @Override + public TopicRouteData route(String topic) { + try { + return mqAdminExt.examineTopicRouteInfo(topic); + } + catch (Exception ex) { + throw Throwables.propagate(ex); + } + } + + @Override + public GroupList queryTopicConsumerInfo(String topic) { + try { + return mqAdminExt.queryTopicConsumeByWho(topic); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + + @Override + public void createOrUpdate(TopicConfigInfo topicCreateOrUpdateRequest) { + TopicConfig topicConfig = new TopicConfig(); + BeanUtils.copyProperties(topicCreateOrUpdateRequest, topicConfig); + try { + ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo(); + for (String brokerName : changeToBrokerNameSet(clusterInfo.getClusterAddrTable(), + topicCreateOrUpdateRequest.getClusterNameList(), topicCreateOrUpdateRequest.getBrokerNameList())) { + mqAdminExt.createAndUpdateTopicConfig(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), topicConfig); + } + } + catch (Exception err) { + throw Throwables.propagate(err); + } + } + + @Override + public TopicConfig examineTopicConfig(String topic, String brokerName) { + ClusterInfo clusterInfo = null; + try { + clusterInfo = mqAdminExt.examineBrokerClusterInfo(); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + return mqAdminExt.examineTopicConfig(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), topic); + } + + @Override + public List<TopicConfigInfo> examineTopicConfig(String topic) { + List<TopicConfigInfo> topicConfigInfoList = Lists.newArrayList(); + TopicRouteData topicRouteData = route(topic); + for (BrokerData brokerData : topicRouteData.getBrokerDatas()) { + TopicConfigInfo topicConfigInfo = new TopicConfigInfo(); + TopicConfig topicConfig = examineTopicConfig(topic, brokerData.getBrokerName()); + BeanUtils.copyProperties(topicConfig, topicConfigInfo); + topicConfigInfo.setBrokerNameList(Lists.newArrayList(brokerData.getBrokerName())); + topicConfigInfoList.add(topicConfigInfo); + } + return topicConfigInfoList; + } + + @Override + public boolean deleteTopic(String topic, String clusterName) { + try { + if (StringUtils.isBlank(clusterName)) { + return deleteTopic(topic); + } + Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(mqAdminExt, clusterName); + mqAdminExt.deleteTopicInBroker(masterSet, topic); + Set<String> nameServerSet = null; + if (StringUtils.isNotBlank(rMQConfigure.getNamesrvAddr())) { + String[] ns = rMQConfigure.getNamesrvAddr().split(";"); + nameServerSet = new HashSet<String>(Arrays.asList(ns)); + } + mqAdminExt.deleteTopicInNameServer(nameServerSet, topic); + } + catch (Exception err) { + throw Throwables.propagate(err); + } + return true; + } + + @Override + public boolean deleteTopic(String topic) { + ClusterInfo clusterInfo = null; + try { + clusterInfo = mqAdminExt.examineBrokerClusterInfo(); + } + catch (Exception err) { + throw Throwables.propagate(err); + } + for (String clusterName : clusterInfo.getClusterAddrTable().keySet()) { + deleteTopic(topic, clusterName); + } + return true; + } + + @Override + public boolean deleteTopicInBroker(String brokerName, String topic) { + + try { + ClusterInfo clusterInfo = null; + try { + clusterInfo = mqAdminExt.examineBrokerClusterInfo(); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + mqAdminExt.deleteTopicInBroker(Sets.newHashSet(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr()), topic); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + return true; + } + + @Override + public SendResult sendTopicMessageRequest(SendTopicMessageRequest sendTopicMessageRequest) { + DefaultMQProducer producer = new DefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP); + producer.setInstanceName(String.valueOf(System.currentTimeMillis())); + producer.setNamesrvAddr(rMQConfigure.getNamesrvAddr()); + try { + producer.start(); + Message msg = new Message(sendTopicMessageRequest.getTopic(), + sendTopicMessageRequest.getTag(), + sendTopicMessageRequest.getKey(), + sendTopicMessageRequest.getMessageBody().getBytes() + ); + return producer.send(msg); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + finally { + producer.shutdown(); + } + } + +}
