http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/2a8ff251/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/client/MQAdminExtImpl.java ---------------------------------------------------------------------- diff --git a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/client/MQAdminExtImpl.java b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/client/MQAdminExtImpl.java deleted file mode 100644 index 0eba3a5..0000000 --- a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/client/MQAdminExtImpl.java +++ /dev/null @@ -1,501 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.rocketmq.console.service.client; - -import com.google.common.base.Throwables; -import java.io.UnsupportedEncodingException; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import org.apache.rocketmq.client.QueryResult; -import org.apache.rocketmq.client.exception.MQBrokerException; -import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.client.impl.MQAdminImpl; -import org.apache.rocketmq.common.TopicConfig; -import org.apache.rocketmq.common.admin.ConsumeStats; -import org.apache.rocketmq.common.admin.RollbackStats; -import org.apache.rocketmq.common.admin.TopicStatsTable; -import org.apache.rocketmq.common.message.MessageClientIDSetter; -import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.common.message.MessageQueue; -import org.apache.rocketmq.common.protocol.RequestCode; -import org.apache.rocketmq.common.protocol.ResponseCode; -import org.apache.rocketmq.common.protocol.body.BrokerStatsData; -import org.apache.rocketmq.common.protocol.body.ClusterInfo; -import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; -import org.apache.rocketmq.common.protocol.body.ConsumeStatsList; -import org.apache.rocketmq.common.protocol.body.ConsumerConnection; -import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; -import org.apache.rocketmq.common.protocol.body.GroupList; -import org.apache.rocketmq.common.protocol.body.KVTable; -import org.apache.rocketmq.common.protocol.body.ProducerConnection; -import org.apache.rocketmq.common.protocol.body.QueueTimeSpan; -import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; -import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; -import org.apache.rocketmq.common.protocol.body.TopicList; -import org.apache.rocketmq.common.protocol.route.TopicRouteData; -import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; -import org.apache.rocketmq.console.util.JsonUtil; -import org.apache.rocketmq.remoting.RemotingClient; -import org.apache.rocketmq.remoting.exception.RemotingCommandException; -import org.apache.rocketmq.remoting.exception.RemotingConnectException; -import org.apache.rocketmq.remoting.exception.RemotingException; -import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; -import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; -import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import org.apache.rocketmq.tools.admin.MQAdminExt; -import org.apache.rocketmq.tools.admin.api.MessageTrack; -import org.joor.Reflect; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Service; - -import static org.apache.rocketmq.remoting.protocol.RemotingSerializable.decode; - -@Service -public class MQAdminExtImpl implements MQAdminExt { - private Logger logger = LoggerFactory.getLogger(MQAdminExtImpl.class); - - public MQAdminExtImpl() { - } - - @Override - public void updateBrokerConfig(String brokerAddr, Properties properties) - throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, - UnsupportedEncodingException, InterruptedException, MQBrokerException { - MQAdminInstance.threadLocalMQAdminExt().updateBrokerConfig(brokerAddr, properties); - } - - @Override - public void createAndUpdateTopicConfig(String addr, TopicConfig config) - throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - MQAdminInstance.threadLocalMQAdminExt().createAndUpdateTopicConfig(addr, config); - } - - @Override - public void createAndUpdateSubscriptionGroupConfig(String addr, SubscriptionGroupConfig config) - throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - MQAdminInstance.threadLocalMQAdminExt().createAndUpdateSubscriptionGroupConfig(addr, config); - } - - @Override - public SubscriptionGroupConfig examineSubscriptionGroupConfig(String addr, String group) { - RemotingClient remotingClient = MQAdminInstance.threadLocalRemotingClient(); - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null); - RemotingCommand response = null; - try { - response = remotingClient.invokeSync(addr, request, 3000); - } - catch (Exception err) { - throw Throwables.propagate(err); - } - assert response != null; - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - SubscriptionGroupWrapper subscriptionGroupWrapper = decode(response.getBody(), SubscriptionGroupWrapper.class); - return subscriptionGroupWrapper.getSubscriptionGroupTable().get(group); - } - default: - throw Throwables.propagate(new MQBrokerException(response.getCode(), response.getRemark())); - } - } - - @Override - public TopicConfig examineTopicConfig(String addr, String topic) { - RemotingClient remotingClient = MQAdminInstance.threadLocalRemotingClient(); - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null); - RemotingCommand response = null; - try { - response = remotingClient.invokeSync(addr, request, 3000); - } - catch (Exception err) { - throw Throwables.propagate(err); - } - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - TopicConfigSerializeWrapper topicConfigSerializeWrapper = decode(response.getBody(), TopicConfigSerializeWrapper.class); - return topicConfigSerializeWrapper.getTopicConfigTable().get(topic); - } - default: - throw Throwables.propagate(new MQBrokerException(response.getCode(), response.getRemark())); - } - } - - @Override - public TopicStatsTable examineTopicStats(String topic) - throws RemotingException, MQClientException, InterruptedException, MQBrokerException { - return MQAdminInstance.threadLocalMQAdminExt().examineTopicStats(topic); - } - - @Override - public TopicList fetchAllTopicList() throws RemotingException, MQClientException, InterruptedException { - TopicList topicList = MQAdminInstance.threadLocalMQAdminExt().fetchAllTopicList(); - logger.debug("op=look={}", JsonUtil.obj2String(topicList.getTopicList())); - return topicList; - } - - @Override - public KVTable fetchBrokerRuntimeStats(String brokerAddr) - throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, - InterruptedException, MQBrokerException { - return MQAdminInstance.threadLocalMQAdminExt().fetchBrokerRuntimeStats(brokerAddr); - } - - @Override - public ConsumeStats examineConsumeStats(String consumerGroup) - throws RemotingException, MQClientException, InterruptedException, MQBrokerException { - return MQAdminInstance.threadLocalMQAdminExt().examineConsumeStats(consumerGroup); - } - - @Override - public ConsumeStats examineConsumeStats(String consumerGroup, String topic) - throws RemotingException, MQClientException, InterruptedException, MQBrokerException { - return MQAdminInstance.threadLocalMQAdminExt().examineConsumeStats(consumerGroup, topic); - } - - @Override - public ClusterInfo examineBrokerClusterInfo() - throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, - RemotingConnectException { - return MQAdminInstance.threadLocalMQAdminExt().examineBrokerClusterInfo(); - } - - @Override - public TopicRouteData examineTopicRouteInfo(String topic) - throws RemotingException, MQClientException, InterruptedException { - return MQAdminInstance.threadLocalMQAdminExt().examineTopicRouteInfo(topic); - } - - @Override - public ConsumerConnection examineConsumerConnectionInfo(String consumerGroup) - throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, - InterruptedException, MQBrokerException, RemotingException, MQClientException { - return MQAdminInstance.threadLocalMQAdminExt().examineConsumerConnectionInfo(consumerGroup); - } - - @Override - public ProducerConnection examineProducerConnectionInfo(String producerGroup, String topic) - throws RemotingException, MQClientException, InterruptedException, MQBrokerException { - return MQAdminInstance.threadLocalMQAdminExt().examineProducerConnectionInfo(producerGroup, topic); - } - - @Override - public List<String> getNameServerAddressList() { - return MQAdminInstance.threadLocalMQAdminExt().getNameServerAddressList(); - } - - @Override - public int wipeWritePermOfBroker(String namesrvAddr, String brokerName) - throws RemotingCommandException, RemotingConnectException, RemotingSendRequestException, - RemotingTimeoutException, InterruptedException, MQClientException { - return MQAdminInstance.threadLocalMQAdminExt().wipeWritePermOfBroker(namesrvAddr, brokerName); - } - - @Override - public void putKVConfig(String namespace, String key, String value) { - MQAdminInstance.threadLocalMQAdminExt().putKVConfig(namespace, key, value); - } - - @Override - public String getKVConfig(String namespace, String key) - throws RemotingException, MQClientException, InterruptedException { - return MQAdminInstance.threadLocalMQAdminExt().getKVConfig(namespace, key); - } - - @Override - public KVTable getKVListByNamespace(String namespace) - throws RemotingException, MQClientException, InterruptedException { - return MQAdminInstance.threadLocalMQAdminExt().getKVListByNamespace(namespace); - } - - @Override - public void deleteTopicInBroker(Set<String> addrs, String topic) - throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - logger.info("addrs={} topic={}", JsonUtil.obj2String(addrs), topic); - MQAdminInstance.threadLocalMQAdminExt().deleteTopicInBroker(addrs, topic); - } - - @Override - public void deleteTopicInNameServer(Set<String> addrs, String topic) - throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - MQAdminInstance.threadLocalMQAdminExt().deleteTopicInNameServer(addrs, topic); - } - - @Override - public void deleteSubscriptionGroup(String addr, String groupName) - throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - MQAdminInstance.threadLocalMQAdminExt().deleteSubscriptionGroup(addr, groupName); - } - - @Override - public void createAndUpdateKvConfig(String namespace, String key, String value) - throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - MQAdminInstance.threadLocalMQAdminExt().createAndUpdateKvConfig(namespace, key, value); - } - - @Override - public void deleteKvConfig(String namespace, String key) - throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - MQAdminInstance.threadLocalMQAdminExt().deleteKvConfig(namespace, key); - } - - @Override - public List<RollbackStats> resetOffsetByTimestampOld(String consumerGroup, String topic, long timestamp, - boolean force) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - return MQAdminInstance.threadLocalMQAdminExt().resetOffsetByTimestampOld(consumerGroup, topic, timestamp, force); - } - - @Override - public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp, - boolean isForce) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - return MQAdminInstance.threadLocalMQAdminExt().resetOffsetByTimestamp(topic, group, timestamp, isForce); - } - - @Override - public void resetOffsetNew(String consumerGroup, String topic, long timestamp) - throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - MQAdminInstance.threadLocalMQAdminExt().resetOffsetNew(consumerGroup, topic, timestamp); - } - - @Override - public Map<String, Map<MessageQueue, Long>> getConsumeStatus(String topic, String group, - String clientAddr) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - return MQAdminInstance.threadLocalMQAdminExt().getConsumeStatus(topic, group, clientAddr); - } - - @Override - public void createOrUpdateOrderConf(String key, String value, boolean isCluster) - throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - MQAdminInstance.threadLocalMQAdminExt().createOrUpdateOrderConf(key, value, isCluster); - } - - @Override - public GroupList queryTopicConsumeByWho(String topic) - throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, - InterruptedException, MQBrokerException, RemotingException, MQClientException { - return MQAdminInstance.threadLocalMQAdminExt().queryTopicConsumeByWho(topic); - } - - @Override - public boolean cleanExpiredConsumerQueue(String cluster) - throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, - InterruptedException { - return MQAdminInstance.threadLocalMQAdminExt().cleanExpiredConsumerQueue(cluster); - } - - @Override - public boolean cleanExpiredConsumerQueueByAddr(String addr) - throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, - InterruptedException { - return MQAdminInstance.threadLocalMQAdminExt().cleanExpiredConsumerQueueByAddr(addr); - } - - @Override - public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, boolean jstack) - throws RemotingException, MQClientException, InterruptedException { - return MQAdminInstance.threadLocalMQAdminExt().getConsumerRunningInfo(consumerGroup, clientId, jstack); - } - - @Override - public ConsumeMessageDirectlyResult consumeMessageDirectly(String consumerGroup, String clientId, - String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { - return MQAdminInstance.threadLocalMQAdminExt().consumeMessageDirectly(consumerGroup, clientId, msgId); - } - - @Override - public List<MessageTrack> messageTrackDetail(MessageExt msg) - throws RemotingException, MQClientException, InterruptedException, MQBrokerException { - return MQAdminInstance.threadLocalMQAdminExt().messageTrackDetail(msg); - } - - @Override - public void cloneGroupOffset(String srcGroup, String destGroup, String topic, boolean isOffline) - throws RemotingException, MQClientException, InterruptedException, MQBrokerException { - MQAdminInstance.threadLocalMQAdminExt().cloneGroupOffset(srcGroup, destGroup, topic, isOffline); - } - - @Override - public void createTopic(String key, String newTopic, int queueNum) throws MQClientException { - MQAdminInstance.threadLocalMQAdminExt().createTopic(key, newTopic, queueNum); - } - - @Override - public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) - throws MQClientException { - MQAdminInstance.threadLocalMQAdminExt().createTopic(key, newTopic, queueNum, topicSysFlag); - } - - @Override - public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { - return MQAdminInstance.threadLocalMQAdminExt().searchOffset(mq, timestamp); - } - - @Override - public long maxOffset(MessageQueue mq) throws MQClientException { - return MQAdminInstance.threadLocalMQAdminExt().maxOffset(mq); - } - - @Override - public long minOffset(MessageQueue mq) throws MQClientException { - return MQAdminInstance.threadLocalMQAdminExt().minOffset(mq); - } - - @Override - public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException { - return MQAdminInstance.threadLocalMQAdminExt().earliestMsgStoreTime(mq); - } - - @Override - public MessageExt viewMessage(String msgId) - throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - return MQAdminInstance.threadLocalMQAdminExt().viewMessage(msgId); - } - - @Override - public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) - throws MQClientException, InterruptedException { - return MQAdminInstance.threadLocalMQAdminExt().queryMessage(topic, key, maxNum, begin, end); - } - - @Override - @Deprecated - public void start() throws MQClientException { - throw new IllegalStateException("thisMethod is deprecated.use org.apache.rocketmq.console.aspect.admin.MQAdminAspect instead of this"); - } - - @Override - @Deprecated - public void shutdown() { - throw new IllegalStateException("thisMethod is deprecated.use org.apache.rocketmq.console.aspect.admin.MQAdminAspect instead of this"); - } - - // below is 3.2.6->3.5.8 updated - - @Override - public List<QueueTimeSpan> queryConsumeTimeSpan(String topic, - String group) throws InterruptedException, MQBrokerException, RemotingException, MQClientException { - return MQAdminInstance.threadLocalMQAdminExt().queryConsumeTimeSpan(topic, group); - } - - //MessageClientIDSetter.getNearlyTimeFromID has bug,so we subtract half a day - //next version we will remove it - //https://issues.apache.org/jira/browse/ROCKETMQ-111 - //https://github.com/apache/incubator-rocketmq/pull/69 - @Override - public MessageExt viewMessage(String topic, - String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - logger.info("MessageClientIDSetter.getNearlyTimeFromID(msgId)={} msgId={}", MessageClientIDSetter.getNearlyTimeFromID(msgId), msgId); - try { - return viewMessage(msgId); - } - catch (Exception e) { - } - MQAdminImpl mqAdminImpl = MQAdminInstance.threadLocalMqClientInstance().getMQAdminImpl(); - QueryResult qr = Reflect.on(mqAdminImpl).call("queryMessage", topic, msgId, 32, - MessageClientIDSetter.getNearlyTimeFromID(msgId).getTime() - 1000 * 60 * 60 * 13L, Long.MAX_VALUE, true).get(); - if (qr != null && qr.getMessageList() != null && qr.getMessageList().size() > 0) { - return qr.getMessageList().get(0); - } - else { - return null; - } - } - - @Override - public ConsumeMessageDirectlyResult consumeMessageDirectly(String consumerGroup, String clientId, String topic, - String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { - return MQAdminInstance.threadLocalMQAdminExt().consumeMessageDirectly(consumerGroup, clientId, topic, msgId); - } - - @Override - public Properties getBrokerConfig( - String brokerAddr) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, UnsupportedEncodingException, InterruptedException, MQBrokerException { - return MQAdminInstance.threadLocalMQAdminExt().getBrokerConfig(brokerAddr); - } - - @Override - public TopicList fetchTopicsByCLuster( - String clusterName) throws RemotingException, MQClientException, InterruptedException { - return MQAdminInstance.threadLocalMQAdminExt().fetchTopicsByCLuster(clusterName); - } - - @Override - public boolean cleanUnusedTopic( - String cluster) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException { - return MQAdminInstance.threadLocalMQAdminExt().cleanUnusedTopic(cluster); - } - - @Override - public boolean cleanUnusedTopicByAddr( - String addr) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException { - return MQAdminInstance.threadLocalMQAdminExt().cleanUnusedTopicByAddr(addr); - } - - @Override - public BrokerStatsData viewBrokerStatsData(String brokerAddr, String statsName, - String statsKey) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException { - return MQAdminInstance.threadLocalMQAdminExt().viewBrokerStatsData(brokerAddr, statsName, statsKey); - } - - @Override - public Set<String> getClusterList( - String topic) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException { - return MQAdminInstance.threadLocalMQAdminExt().getClusterList(topic); - } - - @Override - public ConsumeStatsList fetchConsumeStatsInBroker(String brokerAddr, boolean isOrder, - long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException { - return MQAdminInstance.threadLocalMQAdminExt().fetchConsumeStatsInBroker(brokerAddr, isOrder, timeoutMillis); - } - - @Override - public Set<String> getTopicClusterList( - String topic) throws InterruptedException, MQBrokerException, MQClientException, RemotingException { - return MQAdminInstance.threadLocalMQAdminExt().getTopicClusterList(topic); - } - - @Override - public SubscriptionGroupWrapper getAllSubscriptionGroup(String brokerAddr, - long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { - return MQAdminInstance.threadLocalMQAdminExt().getAllSubscriptionGroup(brokerAddr, timeoutMillis); - } - - @Override - public TopicConfigSerializeWrapper getAllTopicGroup(String brokerAddr, - long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { - return MQAdminInstance.threadLocalMQAdminExt().getAllTopicGroup(brokerAddr, timeoutMillis); - } - - @Override - public void updateConsumeOffset(String brokerAddr, String consumeGroup, MessageQueue mq, - long offset) throws RemotingException, InterruptedException, MQBrokerException { - MQAdminInstance.threadLocalMQAdminExt().updateConsumeOffset(brokerAddr, consumeGroup, mq, offset); - } - - // 4.0.0 added - @Override public void updateNameServerConfig(Properties properties, - List<String> list) throws InterruptedException, RemotingConnectException, UnsupportedEncodingException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, MQBrokerException { - - } - - @Override public Map<String, Properties> getNameServerConfig( - List<String> list) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException, UnsupportedEncodingException { - return null; - } -}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/2a8ff251/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/client/MQAdminInstance.java ---------------------------------------------------------------------- diff --git a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/client/MQAdminInstance.java b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/client/MQAdminInstance.java deleted file mode 100644 index e914e6c..0000000 --- a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/client/MQAdminInstance.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.rocketmq.console.service.client; - -import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.client.impl.MQClientAPIImpl; -import org.apache.rocketmq.client.impl.factory.MQClientInstance; -import org.apache.rocketmq.remoting.RemotingClient; -import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; -import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl; -import org.apache.rocketmq.tools.admin.MQAdminExt; -import org.joor.Reflect; - -public class MQAdminInstance { - private static final ThreadLocal<DefaultMQAdminExt> MQ_ADMIN_EXT_THREAD_LOCAL = new ThreadLocal<DefaultMQAdminExt>(); - private static final ThreadLocal<Integer> INIT_COUNTER = new ThreadLocal<Integer>(); - - public static MQAdminExt threadLocalMQAdminExt() { - DefaultMQAdminExt defaultMQAdminExt = MQ_ADMIN_EXT_THREAD_LOCAL.get(); - if (defaultMQAdminExt == null) { - throw new IllegalStateException("defaultMQAdminExt should be init before you get this"); - } - return defaultMQAdminExt; - } - - public static RemotingClient threadLocalRemotingClient() { - MQClientInstance mqClientInstance = threadLocalMqClientInstance(); - MQClientAPIImpl mQClientAPIImpl = Reflect.on(mqClientInstance).get("mQClientAPIImpl"); - return Reflect.on(mQClientAPIImpl).get("remotingClient"); - } - - public static MQClientInstance threadLocalMqClientInstance() { - DefaultMQAdminExtImpl defaultMQAdminExtImpl = Reflect.on(MQAdminInstance.threadLocalMQAdminExt()).get("defaultMQAdminExtImpl"); - return Reflect.on(defaultMQAdminExtImpl).get("mqClientInstance"); - } - - public static void initMQAdminInstance(long timeoutMillis) throws MQClientException { - Integer nowCount = INIT_COUNTER.get(); - if (nowCount == null) { - DefaultMQAdminExt defaultMQAdminExt; - if (timeoutMillis > 0) { - defaultMQAdminExt = new DefaultMQAdminExt(timeoutMillis); - } - else { - defaultMQAdminExt = new DefaultMQAdminExt(); - } - defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); - defaultMQAdminExt.start(); - MQ_ADMIN_EXT_THREAD_LOCAL.set(defaultMQAdminExt); - INIT_COUNTER.set(1); - } - else { - INIT_COUNTER.set(nowCount + 1); - } - - } - - public static void destroyMQAdminInstance() { - Integer nowCount = INIT_COUNTER.get() - 1; - if (nowCount > 0) { - INIT_COUNTER.set(nowCount); - return; - } - MQAdminExt mqAdminExt = MQ_ADMIN_EXT_THREAD_LOCAL.get(); - if (mqAdminExt != null) { - mqAdminExt.shutdown(); - MQ_ADMIN_EXT_THREAD_LOCAL.remove(); - INIT_COUNTER.remove(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/2a8ff251/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/ClusterServiceImpl.java ---------------------------------------------------------------------- diff --git a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/ClusterServiceImpl.java b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/ClusterServiceImpl.java deleted file mode 100644 index e225edc..0000000 --- a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/ClusterServiceImpl.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.console.service.impl; - -import org.apache.rocketmq.common.protocol.body.ClusterInfo; -import org.apache.rocketmq.common.protocol.body.KVTable; -import org.apache.rocketmq.common.protocol.route.BrokerData; -import org.apache.rocketmq.tools.admin.MQAdminExt; -import org.apache.rocketmq.console.service.ClusterService; -import org.apache.rocketmq.console.util.JsonUtil; -import com.google.common.base.Throwables; -import com.google.common.collect.Maps; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Service; - -import javax.annotation.Resource; -import java.util.Map; -import java.util.Properties; - -@Service -public class ClusterServiceImpl implements ClusterService { - private Logger logger = LoggerFactory.getLogger(ClusterServiceImpl.class); - @Resource - private MQAdminExt mqAdminExt; - - @Override - public Map<String, Object> list() { - try { - Map<String, Object> resultMap = Maps.newHashMap(); - ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo(); - logger.info("op=look_clusterInfo {}", JsonUtil.obj2String(clusterInfo)); - Map<String/*brokerName*/, Map<Long/* brokerId */, Object/* brokerDetail */>> brokerServer = Maps.newHashMap(); - for (BrokerData brokerData : clusterInfo.getBrokerAddrTable().values()) { - Map<Long, Object> brokerMasterSlaveMap = Maps.newHashMap(); - for (Map.Entry<Long/* brokerId */, String/* broker address */> brokerAddr : brokerData.getBrokerAddrs().entrySet()) { - KVTable kvTable = mqAdminExt.fetchBrokerRuntimeStats(brokerAddr.getValue()); -// KVTable kvTable = mqAdminExt.fetchBrokerRuntimeStats("127.0.0.1:10911"); - brokerMasterSlaveMap.put(brokerAddr.getKey(), kvTable.getTable()); - } - brokerServer.put(brokerData.getBrokerName(), brokerMasterSlaveMap); - } - resultMap.put("clusterInfo", clusterInfo); - resultMap.put("brokerServer", brokerServer); - return resultMap; - } - catch (Exception err) { - throw Throwables.propagate(err); - } - } - - - @Override - public Properties getBrokerConfig(String brokerAddr) { - try { - return mqAdminExt.getBrokerConfig(brokerAddr); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/2a8ff251/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/ConsumerServiceImpl.java ---------------------------------------------------------------------- diff --git a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/ConsumerServiceImpl.java b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/ConsumerServiceImpl.java deleted file mode 100644 index 715cbf5..0000000 --- a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/ConsumerServiceImpl.java +++ /dev/null @@ -1,341 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.console.service.impl; - -import com.google.common.base.Predicate; -import com.google.common.base.Throwables; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Set; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.common.MQVersion; -import org.apache.rocketmq.common.admin.ConsumeStats; -import org.apache.rocketmq.common.admin.RollbackStats; -import org.apache.rocketmq.common.message.MessageQueue; -import org.apache.rocketmq.common.protocol.ResponseCode; -import org.apache.rocketmq.common.protocol.body.ClusterInfo; -import org.apache.rocketmq.common.protocol.body.Connection; -import org.apache.rocketmq.common.protocol.body.ConsumerConnection; -import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; -import org.apache.rocketmq.common.protocol.body.GroupList; -import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; -import org.apache.rocketmq.common.protocol.route.BrokerData; -import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; -import org.apache.rocketmq.console.aspect.admin.annotation.MultiMQAdminCmdMethod; -import org.apache.rocketmq.console.model.ConsumerGroupRollBackStat; -import org.apache.rocketmq.console.model.GroupConsumeInfo; -import org.apache.rocketmq.console.model.QueueStatInfo; -import org.apache.rocketmq.console.model.TopicConsumerInfo; -import org.apache.rocketmq.console.model.request.ConsumerConfigInfo; -import org.apache.rocketmq.console.model.request.DeleteSubGroupRequest; -import org.apache.rocketmq.console.model.request.ResetOffsetRequest; -import org.apache.rocketmq.console.service.AbstractCommonService; -import org.apache.rocketmq.console.service.ConsumerService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Service; - -import static com.google.common.base.Throwables.propagate; - -@Service -public class ConsumerServiceImpl extends AbstractCommonService implements ConsumerService { - private Logger logger = LoggerFactory.getLogger(ConsumerServiceImpl.class); - - @Override - @MultiMQAdminCmdMethod - public List<GroupConsumeInfo> queryGroupList() { - Set<String> consumerGroupSet = Sets.newHashSet(); - try { - ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo(); - for (BrokerData brokerData : clusterInfo.getBrokerAddrTable().values()) { - SubscriptionGroupWrapper subscriptionGroupWrapper = mqAdminExt.getAllSubscriptionGroup(brokerData.selectBrokerAddr(), 3000L); - consumerGroupSet.addAll(subscriptionGroupWrapper.getSubscriptionGroupTable().keySet()); - } - } - catch (Exception err) { - throw Throwables.propagate(err); - } - List<GroupConsumeInfo> groupConsumeInfoList = Lists.newArrayList(); - for (String consumerGroup : consumerGroupSet) { - groupConsumeInfoList.add(queryGroup(consumerGroup)); - } - Collections.sort(groupConsumeInfoList); - return groupConsumeInfoList; - } - - @Override - @MultiMQAdminCmdMethod - public GroupConsumeInfo queryGroup(String consumerGroup) { - GroupConsumeInfo groupConsumeInfo = new GroupConsumeInfo(); - try { - ConsumeStats consumeStats = null; - try { - consumeStats = mqAdminExt.examineConsumeStats(consumerGroup); - } - catch (Exception e) { - logger.warn("examineConsumeStats exception, " + consumerGroup, e); - } - - ConsumerConnection consumerConnection = null; - try { - consumerConnection = mqAdminExt.examineConsumerConnectionInfo(consumerGroup); - } - catch (Exception e) { - logger.warn("examineConsumerConnectionInfo exception, " + consumerGroup, e); - } - - groupConsumeInfo.setGroup(consumerGroup); - - if (consumeStats != null) { - groupConsumeInfo.setConsumeTps((int)consumeStats.getConsumeTps()); - groupConsumeInfo.setDiffTotal(consumeStats.computeTotalDiff()); - } - - if (consumerConnection != null) { - groupConsumeInfo.setCount(consumerConnection.getConnectionSet().size()); - groupConsumeInfo.setMessageModel(consumerConnection.getMessageModel()); - groupConsumeInfo.setConsumeType(consumerConnection.getConsumeType()); - groupConsumeInfo.setVersion(MQVersion.getVersionDesc(consumerConnection.computeMinVersion())); - } - } - catch (Exception e) { - logger.warn("examineConsumeStats or examineConsumerConnectionInfo exception, " - + consumerGroup, e); - } - return groupConsumeInfo; - } - - @Override - public List<TopicConsumerInfo> queryConsumeStatsListByGroupName(String groupName) { - return queryConsumeStatsList(null, groupName); - } - - @Override - @MultiMQAdminCmdMethod - public List<TopicConsumerInfo> queryConsumeStatsList(final String topic, String groupName) { - ConsumeStats consumeStats = null; - try { - consumeStats = mqAdminExt.examineConsumeStats(groupName, topic); - } - catch (Exception e) { - throw propagate(e); - } - List<MessageQueue> mqList = Lists.newArrayList(Iterables.filter(consumeStats.getOffsetTable().keySet(), new Predicate<MessageQueue>() { - @Override - public boolean apply(MessageQueue o) { - return StringUtils.isBlank(topic) || o.getTopic().equals(topic); - } - })); - Collections.sort(mqList); - List<TopicConsumerInfo> topicConsumerInfoList = Lists.newArrayList(); - TopicConsumerInfo nowTopicConsumerInfo = null; - Map<MessageQueue, String> messageQueueClientMap = getClientConnection(groupName); - for (MessageQueue mq : mqList) { - if (nowTopicConsumerInfo == null || (!StringUtils.equals(mq.getTopic(), nowTopicConsumerInfo.getTopic()))) { - nowTopicConsumerInfo = new TopicConsumerInfo(mq.getTopic()); - topicConsumerInfoList.add(nowTopicConsumerInfo); - } - QueueStatInfo queueStatInfo = QueueStatInfo.fromOffsetTableEntry(mq, consumeStats.getOffsetTable().get(mq)); - queueStatInfo.setClientInfo(messageQueueClientMap.get(mq)); - nowTopicConsumerInfo.appendQueueStatInfo(queueStatInfo); - } - return topicConsumerInfoList; - } - - private Map<MessageQueue, String> getClientConnection(String groupName) { - Map<MessageQueue, String> results = Maps.newHashMap(); - try { - ConsumerConnection consumerConnection = mqAdminExt.examineConsumerConnectionInfo(groupName); - for (Connection connection : consumerConnection.getConnectionSet()) { - String clinetId = connection.getClientId(); - ConsumerRunningInfo consumerRunningInfo = mqAdminExt.getConsumerRunningInfo(groupName, clinetId, false); - for (MessageQueue messageQueue : consumerRunningInfo.getMqTable().keySet()) { -// results.put(messageQueue, clinetId + " " + connection.getClientAddr()); - results.put(messageQueue, clinetId); - } - } - } - catch (Exception err) { - logger.error("op=getClientConnection_error", err); - } - return results; - } - - @Override - @MultiMQAdminCmdMethod - public Map<String /*groupName*/, TopicConsumerInfo> queryConsumeStatsListByTopicName(String topic) { - Map<String, TopicConsumerInfo> group2ConsumerInfoMap = Maps.newHashMap(); - try { - GroupList groupList = mqAdminExt.queryTopicConsumeByWho(topic); - for (String group : groupList.getGroupList()) { - List<TopicConsumerInfo> topicConsumerInfoList = null; - try { - topicConsumerInfoList = queryConsumeStatsList(topic, group); - } - catch (Exception ignore) { - } - group2ConsumerInfoMap.put(group, CollectionUtils.isEmpty(topicConsumerInfoList) ? new TopicConsumerInfo(topic) : topicConsumerInfoList.get(0)); - } - return group2ConsumerInfoMap; - } - catch (Exception e) { - throw propagate(e); - } - } - - @Override - @MultiMQAdminCmdMethod - public Map<String, ConsumerGroupRollBackStat> resetOffset(ResetOffsetRequest resetOffsetRequest) { - Map<String, ConsumerGroupRollBackStat> groupRollbackStats = Maps.newHashMap(); - for (String consumerGroup : resetOffsetRequest.getConsumerGroupList()) { - try { - Map<MessageQueue, Long> rollbackStatsMap = - mqAdminExt.resetOffsetByTimestamp(resetOffsetRequest.getTopic(), consumerGroup, resetOffsetRequest.getResetTime(), resetOffsetRequest.isForce()); - ConsumerGroupRollBackStat consumerGroupRollBackStat = new ConsumerGroupRollBackStat(true); - List<RollbackStats> rollbackStatsList = consumerGroupRollBackStat.getRollbackStatsList(); - for (Map.Entry<MessageQueue, Long> rollbackStatsEntty : rollbackStatsMap.entrySet()) { - RollbackStats rollbackStats = new RollbackStats(); - rollbackStats.setRollbackOffset(rollbackStatsEntty.getValue()); - rollbackStats.setQueueId(rollbackStatsEntty.getKey().getQueueId()); - rollbackStats.setBrokerName(rollbackStatsEntty.getKey().getBrokerName()); - rollbackStatsList.add(rollbackStats); - } - groupRollbackStats.put(consumerGroup, consumerGroupRollBackStat); - } - catch (MQClientException e) { - if (ResponseCode.CONSUMER_NOT_ONLINE == e.getResponseCode()) { - try { - ConsumerGroupRollBackStat consumerGroupRollBackStat = new ConsumerGroupRollBackStat(true); - List<RollbackStats> rollbackStatsList = mqAdminExt.resetOffsetByTimestampOld(consumerGroup, resetOffsetRequest.getTopic(), resetOffsetRequest.getResetTime(), true); - consumerGroupRollBackStat.setRollbackStatsList(rollbackStatsList); - groupRollbackStats.put(consumerGroup, consumerGroupRollBackStat); - continue; - } - catch (Exception err) { - logger.error("op=resetOffset_which_not_online_error", err); - } - } - else { - logger.error("op=resetOffset_error", e); - } - groupRollbackStats.put(consumerGroup, new ConsumerGroupRollBackStat(false, e.getMessage())); - } - catch (Exception e) { - logger.error("op=resetOffset_error", e); - groupRollbackStats.put(consumerGroup, new ConsumerGroupRollBackStat(false, e.getMessage())); - } - } - return groupRollbackStats; - } - - @Override - @MultiMQAdminCmdMethod - public List<ConsumerConfigInfo> examineSubscriptionGroupConfig(String group) { - List<ConsumerConfigInfo> consumerConfigInfoList = Lists.newArrayList(); - try { - ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo(); - for (String brokerName : clusterInfo.getBrokerAddrTable().keySet()) { //foreach brokerName - String brokerAddress = clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(); - SubscriptionGroupConfig subscriptionGroupConfig = mqAdminExt.examineSubscriptionGroupConfig(brokerAddress, group); - if (subscriptionGroupConfig == null) { - continue; - } - consumerConfigInfoList.add(new ConsumerConfigInfo(Lists.newArrayList(brokerName), subscriptionGroupConfig)); - } - } - catch (Exception e) { - throw propagate(e); - } - return consumerConfigInfoList; - } - - @Override - @MultiMQAdminCmdMethod - public boolean deleteSubGroup(DeleteSubGroupRequest deleteSubGroupRequest) { - try { - ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo(); - for (String brokerName : deleteSubGroupRequest.getBrokerNameList()) { - logger.info("addr={} groupName={}", clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), deleteSubGroupRequest.getGroupName()); - mqAdminExt.deleteSubscriptionGroup(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), deleteSubGroupRequest.getGroupName()); - } - } - catch (Exception e) { - throw propagate(e); - } - return true; - } - - @Override - public boolean createAndUpdateSubscriptionGroupConfig(ConsumerConfigInfo consumerConfigInfo) { - try { - ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo(); - for (String brokerName : changeToBrokerNameSet(clusterInfo.getClusterAddrTable(), - consumerConfigInfo.getClusterNameList(), consumerConfigInfo.getBrokerNameList())) { - mqAdminExt.createAndUpdateSubscriptionGroupConfig(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), consumerConfigInfo.getSubscriptionGroupConfig()); - } - } - catch (Exception err) { - throw Throwables.propagate(err); - } - return true; - } - - @Override - @MultiMQAdminCmdMethod - public Set<String> fetchBrokerNameSetBySubscriptionGroup(String group) { - Set<String> brokerNameSet = Sets.newHashSet(); - try { - List<ConsumerConfigInfo> consumerConfigInfoList = examineSubscriptionGroupConfig(group); - for (ConsumerConfigInfo consumerConfigInfo : consumerConfigInfoList) { - brokerNameSet.addAll(consumerConfigInfo.getBrokerNameList()); - } - } - catch (Exception e) { - throw Throwables.propagate(e); - } - return brokerNameSet; - - } - - @Override - public ConsumerConnection getConsumerConnection(String consumerGroup) { - try { - return mqAdminExt.examineConsumerConnectionInfo(consumerGroup); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - } - - @Override - public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, boolean jstack) { - try { - return mqAdminExt.getConsumerRunningInfo(consumerGroup, clientId, jstack); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/2a8ff251/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/DashboardCollectServiceImpl.java ---------------------------------------------------------------------- diff --git a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/DashboardCollectServiceImpl.java b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/DashboardCollectServiceImpl.java deleted file mode 100644 index d32a344..0000000 --- a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/DashboardCollectServiceImpl.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.rocketmq.console.service.impl; - -import com.alibaba.fastjson.JSONArray; -import com.alibaba.fastjson.JSONObject; -import com.google.common.base.Charsets; -import com.google.common.base.Throwables; -import com.google.common.base.Ticker; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.cache.RemovalListener; -import com.google.common.cache.RemovalNotification; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.io.Files; -import java.io.File; -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.Set; -import javax.annotation.Resource; -import org.apache.rocketmq.console.config.RMQConfigure; -import org.apache.rocketmq.console.exception.ServiceException; -import org.apache.rocketmq.console.service.DashboardCollectService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Service; - -@Service -public class DashboardCollectServiceImpl implements DashboardCollectService { - - @Resource - private RMQConfigure rmqConfigure; - - private final static Logger log = LoggerFactory.getLogger(DashboardCollectServiceImpl.class); - - private LoadingCache<String, List<String>> brokerMap = CacheBuilder.newBuilder() - .maximumSize(1000) - .concurrencyLevel(10) - .recordStats() - .ticker(Ticker.systemTicker()) - .removalListener(new RemovalListener<Object, Object>() { - @Override - public void onRemoval(RemovalNotification<Object, Object> notification) { - log.debug(notification.getKey() + " was removed, cause is " + notification.getCause()); - } - }) - .build( - new CacheLoader<String, List<String>>() { - @Override - public List<String> load(String key) { - List<String> list = Lists.newArrayList(); - return list; - } - } - ); - - private LoadingCache<String, List<String>> topicMap = CacheBuilder.newBuilder() - .maximumSize(1000) - .concurrencyLevel(10) - .recordStats() - .ticker(Ticker.systemTicker()) - .removalListener(new RemovalListener<Object, Object>() { - @Override - public void onRemoval(RemovalNotification<Object, Object> notification) { - log.debug(notification.getKey() + " was removed, cause is " + notification.getCause()); - } - }) - .build( - new CacheLoader<String, List<String>>() { - @Override - public List<String> load(String key) { - List<String> list = Lists.newArrayList(); - return list; - } - } - ); - - @Override - public LoadingCache<String, List<String>> getBrokerMap() { - return brokerMap; - } - @Override - public LoadingCache<String, List<String>> getTopicMap() { - return topicMap; - } - - @Override - public Map<String, List<String>> jsonDataFile2map(File file) { - List<String> strings; - try { - strings = Files.readLines(file, Charsets.UTF_8); - } - catch (IOException e) { - throw Throwables.propagate(e); - } - StringBuffer sb = new StringBuffer(); - for (String string : strings) { - sb.append(string); - } - JSONObject json = (JSONObject) JSONObject.parse(sb.toString()); - Set<Map.Entry<String, Object>> entries = json.entrySet(); - Map<String, List<String>> map = Maps.newHashMap(); - for (Map.Entry<String, Object> entry : entries) { - JSONArray tpsArray = (JSONArray) entry.getValue(); - if (tpsArray == null) { - continue; - } - Object[] tpsStrArray = tpsArray.toArray(); - List<String> tpsList = Lists.newArrayList(); - for (Object tpsObj : tpsStrArray) { - tpsList.add("" + tpsObj); - } - map.put(entry.getKey(), tpsList); - } - return map; - } - - @Override - public Map<String, List<String>> getBrokerCache(String date) { - String dataLocationPath = rmqConfigure.getConsoleCollectData(); - File file = new File(dataLocationPath + date + ".json"); - if (!file.exists()) { - throw Throwables.propagate(new ServiceException(1, "This date have't data!")); - } - return jsonDataFile2map(file); - } - - @Override - public Map<String, List<String>> getTopicCache(String date) { - String dataLocationPath = rmqConfigure.getConsoleCollectData(); - File file = new File(dataLocationPath + date + "_topic" + ".json"); - if (!file.exists()) { - throw Throwables.propagate(new ServiceException(1, "This date have't data!")); - } - return jsonDataFile2map(file); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/2a8ff251/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/DashboardServiceImpl.java ---------------------------------------------------------------------- diff --git a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/DashboardServiceImpl.java b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/DashboardServiceImpl.java deleted file mode 100644 index 3189093..0000000 --- a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/DashboardServiceImpl.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.console.service.impl; - -import com.google.common.collect.Lists; -import java.text.DateFormat; -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.List; -import java.util.Map; -import javax.annotation.Resource; -import org.apache.rocketmq.console.service.DashboardCollectService; -import org.apache.rocketmq.console.service.DashboardService; -import org.springframework.stereotype.Service; - -@Service -public class DashboardServiceImpl implements DashboardService { - - @Resource - private DashboardCollectService dashboardCollectService; - /** - * @param date format yyyy-MM-dd - */ - @Override - public Map<String, List<String>> queryBrokerData(String date) { - return dashboardCollectService.getBrokerCache(date); - } - - @Override - public Map<String, List<String>> queryTopicData(String date) { - return dashboardCollectService.getTopicCache(date); - } - - /** - * @param date format yyyy-MM-dd - * @param topicName - */ - @Override - public List<String> queryTopicData(String date, String topicName) { - if (null != dashboardCollectService.getTopicCache(date)) { - return dashboardCollectService.getTopicCache(date).get(topicName); - } - return null; - } - - @Override - public List<String> queryTopicCurrentData() { - Date date = new Date(); - DateFormat format = new SimpleDateFormat("yyyy-MM-dd"); - Map<String, List<String>> topicCache = dashboardCollectService.getTopicCache(format.format(date)); - List<String> result = Lists.newArrayList(); - for (Map.Entry<String, List<String>> entry : topicCache.entrySet()) { - List<String> value = entry.getValue(); - result.add(entry.getKey() + "," + value.get(value.size() - 1).split(",")[4]); - } - return result; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/2a8ff251/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/MessageServiceImpl.java ---------------------------------------------------------------------- diff --git a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/MessageServiceImpl.java b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/MessageServiceImpl.java deleted file mode 100644 index 0205a69..0000000 --- a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/MessageServiceImpl.java +++ /dev/null @@ -1,199 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.console.service.impl; - -import com.google.common.base.Function; -import com.google.common.base.Predicate; -import com.google.common.base.Throwables; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import java.util.Collections; -import java.util.Comparator; -import java.util.Date; -import java.util.List; -import java.util.Set; -import javax.annotation.Resource; -import org.apache.commons.lang3.StringUtils; -import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; -import org.apache.rocketmq.client.consumer.PullResult; -import org.apache.rocketmq.common.MixAll; -import org.apache.rocketmq.common.Pair; -import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.common.message.MessageQueue; -import org.apache.rocketmq.common.protocol.body.Connection; -import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; -import org.apache.rocketmq.common.protocol.body.ConsumerConnection; -import org.apache.rocketmq.console.model.MessageView; -import org.apache.rocketmq.console.service.MessageService; -import org.apache.rocketmq.tools.admin.MQAdminExt; -import org.apache.rocketmq.tools.admin.api.MessageTrack; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Service; - -@Service -public class MessageServiceImpl implements MessageService { - - private Logger logger = LoggerFactory.getLogger(MessageServiceImpl.class); - /** - * @see org.apache.rocketmq.store.config.MessageStoreConfig maxMsgsNumBatch = 64; - * @see org.apache.rocketmq.store.index.IndexService maxNum = Math.min(maxNum, this.defaultMessageStore.getMessageStoreConfig().getMaxMsgsNumBatch()); - */ - private final static int QUERY_MESSAGE_MAX_NUM = 64; - @Resource - private MQAdminExt mqAdminExt; - - public Pair<MessageView, List<MessageTrack>> viewMessage(String subject, final String msgId) { - try { - - MessageExt messageExt = mqAdminExt.viewMessage(subject, msgId); - List<MessageTrack> messageTrackList = messageTrackDetail(messageExt); - return new Pair<>(MessageView.fromMessageExt(messageExt), messageTrackList); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - } - - @Override - public List<MessageView> queryMessageByTopicAndKey(String topic, String key) { - try { - return Lists.transform(mqAdminExt.queryMessage(topic, key, QUERY_MESSAGE_MAX_NUM, 0, System.currentTimeMillis()).getMessageList(), new Function<MessageExt, MessageView>() { - @Override - public MessageView apply(MessageExt messageExt) { - return MessageView.fromMessageExt(messageExt); - } - }); - } - catch (Exception err) { - throw Throwables.propagate(err); - } - } - - @Override - public List<MessageView> queryMessageByTopic(String topic, final long begin, final long end) { - DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, null); - List<MessageView> messageViewList = Lists.newArrayList(); - try { - String subExpression = "*"; - consumer.start(); - Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(topic); - for (MessageQueue mq : mqs) { - long minOffset = consumer.searchOffset(mq, begin); - long maxOffset = consumer.searchOffset(mq, end); - READQ: - for (long offset = minOffset; offset <= maxOffset; ) { - try { - if (messageViewList.size() > 2000) { - break; - } - PullResult pullResult = consumer.pull(mq, subExpression, offset, 32); - offset = pullResult.getNextBeginOffset(); - switch (pullResult.getPullStatus()) { - case FOUND: - - List<MessageView> messageViewListByQuery = Lists.transform(pullResult.getMsgFoundList(), new Function<MessageExt, MessageView>() { - @Override - public MessageView apply(MessageExt messageExt) { - messageExt.setBody(null); - return MessageView.fromMessageExt(messageExt); - } - }); - List<MessageView> filteredList = Lists.newArrayList(Iterables.filter(messageViewListByQuery, new Predicate<MessageView>() { - @Override - public boolean apply(MessageView messageView) { - if (messageView.getStoreTimestamp() < begin || messageView.getStoreTimestamp() > end) { - logger.info("begin={} end={} time not in range {} {}", begin, end, messageView.getStoreTimestamp(), new Date(messageView.getStoreTimestamp()).toString()); - } - return messageView.getStoreTimestamp() >= begin && messageView.getStoreTimestamp() <= end; - } - })); - messageViewList.addAll(filteredList); - break; - case NO_MATCHED_MSG: - case NO_NEW_MSG: - case OFFSET_ILLEGAL: - break READQ; - } - } - catch (Exception e) { - break; - } - } - } - Collections.sort(messageViewList, new Comparator<MessageView>() { - @Override - public int compare(MessageView o1, MessageView o2) { - if (o1.getStoreTimestamp() - o2.getStoreTimestamp() == 0) { - return 0; - } - return (o1.getStoreTimestamp() > o2.getStoreTimestamp()) ? -1 : 1; - } - }); - return messageViewList; - } - catch (Exception e) { - throw Throwables.propagate(e); - } - finally { - consumer.shutdown(); - } - } - - @Override - public List<MessageTrack> messageTrackDetail(MessageExt msg) { - try { - return mqAdminExt.messageTrackDetail(msg); - } - catch (Exception e) { - logger.error("op=messageTrackDetailError", e); - return Collections.emptyList(); - } - } - - - @Override - public ConsumeMessageDirectlyResult consumeMessageDirectly(String topic, String msgId, String consumerGroup, - String clientId) { - if (StringUtils.isNotBlank(clientId)) { - try { - return mqAdminExt.consumeMessageDirectly(consumerGroup, clientId, topic, msgId); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - } - - try { - ConsumerConnection consumerConnection = mqAdminExt.examineConsumerConnectionInfo(consumerGroup); - for (Connection connection : consumerConnection.getConnectionSet()) { - if (StringUtils.isBlank(connection.getClientId())) { - continue; - } - logger.info("clientId={}", connection.getClientId()); - return mqAdminExt.consumeMessageDirectly(consumerGroup, connection.getClientId(), topic, msgId); - } - } - catch (Exception e) { - throw Throwables.propagate(e); - } - throw new IllegalStateException("NO CONSUMER"); - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/2a8ff251/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/MonitorServiceImpl.java ---------------------------------------------------------------------- diff --git a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/MonitorServiceImpl.java b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/MonitorServiceImpl.java deleted file mode 100644 index d3a109d..0000000 --- a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/MonitorServiceImpl.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.rocketmq.console.service.impl; - -import com.fasterxml.jackson.core.type.TypeReference; -import com.google.common.base.Throwables; -import java.io.File; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import javax.annotation.PostConstruct; -import javax.annotation.Resource; -import org.apache.rocketmq.common.MixAll; -import org.apache.rocketmq.console.config.RMQConfigure; -import org.apache.rocketmq.console.model.ConsumerMonitorConfig; -import org.apache.rocketmq.console.service.MonitorService; -import org.apache.rocketmq.console.util.JsonUtil; -import org.springframework.stereotype.Service; - -@Service -public class MonitorServiceImpl implements MonitorService { - - - @Resource - private RMQConfigure rmqConfigure; - - private Map<String, ConsumerMonitorConfig> configMap = new ConcurrentHashMap<>(); - - @Override - public boolean createOrUpdateConsumerMonitor(String name, ConsumerMonitorConfig config) { - configMap.put(name, config);// todo if write map success but write file fail - writeToFile(getConsumerMonitorConfigDataPath(), configMap); - return true; - } - - @Override - public Map<String, ConsumerMonitorConfig> queryConsumerMonitorConfig() { - return configMap; - } - - @Override - public ConsumerMonitorConfig queryConsumerMonitorConfigByGroupName(String consumeGroupName) { - return configMap.get(consumeGroupName); - } - - @Override - public boolean deleteConsumerMonitor(String consumeGroupName) { - configMap.remove(consumeGroupName); - writeToFile(getConsumerMonitorConfigDataPath(), configMap); - return true; - } - - //rocketmq.console.data.path/monitor/consumerMonitorConfig.json - private String getConsumerMonitorConfigDataPath() { - return rmqConfigure.getRocketMqConsoleDataPath() + File.separatorChar + "monitor" + File.separatorChar + "consumerMonitorConfig.json"; - } - - private String getConsumerMonitorConfigDataPathBackUp() { - return getConsumerMonitorConfigDataPath() + ".bak"; - } - - private void writeToFile(String path, Object data) { - writeDataJsonToFile(path, JsonUtil.obj2String(data)); - } - - private void writeDataJsonToFile(String path, String dataStr) { - try { - MixAll.string2File(dataStr, path); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - } - - @PostConstruct - private void loadData() { - String content = MixAll.file2String(getConsumerMonitorConfigDataPath()); - if (content == null) { - content = MixAll.file2String(getConsumerMonitorConfigDataPathBackUp()); - } - if (content == null) { - return; - } - configMap = JsonUtil.string2Obj(content, new TypeReference<ConcurrentHashMap<String, ConsumerMonitorConfig>>() { - }); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/2a8ff251/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/OpsServiceImpl.java ---------------------------------------------------------------------- diff --git a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/OpsServiceImpl.java b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/OpsServiceImpl.java deleted file mode 100644 index 84e6d2f..0000000 --- a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/OpsServiceImpl.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.rocketmq.console.service.impl; - -import com.google.common.base.Splitter; -import com.google.common.collect.Maps; -import java.util.List; -import java.util.Map; -import javax.annotation.Resource; -import org.apache.rocketmq.console.config.RMQConfigure; -import org.apache.rocketmq.console.service.AbstractCommonService; -import org.apache.rocketmq.console.service.OpsService; -import org.apache.rocketmq.console.service.checker.CheckerType; -import org.apache.rocketmq.console.service.checker.RocketMqChecker; -import org.springframework.stereotype.Service; - -@Service -public class OpsServiceImpl extends AbstractCommonService implements OpsService { - - @Resource - private RMQConfigure rMQConfigure; - - @Resource - private List<RocketMqChecker> rocketMqCheckerList; - - @Override - public Map<String, Object> homePageInfo() { - Map<String, Object> homePageInfoMap = Maps.newHashMap(); - homePageInfoMap.put("namesvrAddrList", Splitter.on(";").splitToList(rMQConfigure.getNamesrvAddr())); - homePageInfoMap.put("useVIPChannel", Boolean.valueOf(rMQConfigure.getIsVIPChannel())); - return homePageInfoMap; - } - - @Override - public void updateNameSvrAddrList(String nameSvrAddrList) { - rMQConfigure.setNamesrvAddr(nameSvrAddrList); - } - - @Override - public String getNameSvrList() { - return rMQConfigure.getNamesrvAddr(); - } - - @Override - public Map<CheckerType, Object> rocketMqStatusCheck() { - Map<CheckerType, Object> checkResultMap = Maps.newHashMap(); - for (RocketMqChecker rocketMqChecker : rocketMqCheckerList) { - checkResultMap.put(rocketMqChecker.checkerType(), rocketMqChecker.doCheck()); - } - return checkResultMap; - } - - @Override public boolean updateIsVIPChannel(String useVIPChannel) { - rMQConfigure.setIsVIPChannel(useVIPChannel); - return true; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/2a8ff251/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/ProducerServiceImpl.java ---------------------------------------------------------------------- diff --git a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/ProducerServiceImpl.java b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/ProducerServiceImpl.java deleted file mode 100644 index 3e46958..0000000 --- a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/ProducerServiceImpl.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.console.service.impl; - -import com.google.common.base.Throwables; -import javax.annotation.Resource; -import org.apache.rocketmq.common.protocol.body.ProducerConnection; -import org.apache.rocketmq.console.service.ProducerService; -import org.apache.rocketmq.tools.admin.MQAdminExt; -import org.springframework.stereotype.Service; - -@Service -public class ProducerServiceImpl implements ProducerService { - @Resource - private MQAdminExt mqAdminExt; - - @Override - public ProducerConnection getProducerConnection(String producerGroup, String topic) { - try { - return mqAdminExt.examineProducerConnectionInfo(producerGroup, topic); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/2a8ff251/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/TopicServiceImpl.java ---------------------------------------------------------------------- diff --git a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/TopicServiceImpl.java b/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/TopicServiceImpl.java deleted file mode 100644 index 117bcfd..0000000 --- a/rocketmq-console-ng/src/main/java/org/apache/rocketmq/console/service/impl/TopicServiceImpl.java +++ /dev/null @@ -1,214 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.console.service.impl; - -import com.google.common.base.Throwables; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import org.apache.commons.lang3.StringUtils; -import org.apache.rocketmq.client.producer.DefaultMQProducer; -import org.apache.rocketmq.client.producer.SendResult; -import org.apache.rocketmq.common.MixAll; -import org.apache.rocketmq.common.TopicConfig; -import org.apache.rocketmq.common.admin.TopicStatsTable; -import org.apache.rocketmq.common.message.Message; -import org.apache.rocketmq.common.protocol.body.ClusterInfo; -import org.apache.rocketmq.common.protocol.body.GroupList; -import org.apache.rocketmq.common.protocol.body.TopicList; -import org.apache.rocketmq.common.protocol.route.BrokerData; -import org.apache.rocketmq.common.protocol.route.TopicRouteData; -import org.apache.rocketmq.console.config.RMQConfigure; -import org.apache.rocketmq.console.model.request.SendTopicMessageRequest; -import org.apache.rocketmq.console.model.request.TopicConfigInfo; -import org.apache.rocketmq.console.service.AbstractCommonService; -import org.apache.rocketmq.console.service.TopicService; -import org.apache.rocketmq.tools.command.CommandUtil; -import org.springframework.beans.BeanUtils; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; - -@Service -public class TopicServiceImpl extends AbstractCommonService implements TopicService { - - @Autowired - private RMQConfigure rMQConfigure; - - @Override - public TopicList fetchAllTopicList() { - try { - return mqAdminExt.fetchAllTopicList(); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - } - - @Override - public TopicStatsTable stats(String topic) { - try { - return mqAdminExt.examineTopicStats(topic); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - } - - @Override - public TopicRouteData route(String topic) { - try { - return mqAdminExt.examineTopicRouteInfo(topic); - } - catch (Exception ex) { - throw Throwables.propagate(ex); - } - } - - @Override - public GroupList queryTopicConsumerInfo(String topic) { - try { - return mqAdminExt.queryTopicConsumeByWho(topic); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - } - - @Override - public void createOrUpdate(TopicConfigInfo topicCreateOrUpdateRequest) { - TopicConfig topicConfig = new TopicConfig(); - BeanUtils.copyProperties(topicCreateOrUpdateRequest, topicConfig); - try { - ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo(); - for (String brokerName : changeToBrokerNameSet(clusterInfo.getClusterAddrTable(), - topicCreateOrUpdateRequest.getClusterNameList(), topicCreateOrUpdateRequest.getBrokerNameList())) { - mqAdminExt.createAndUpdateTopicConfig(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), topicConfig); - } - } - catch (Exception err) { - throw Throwables.propagate(err); - } - } - - @Override - public TopicConfig examineTopicConfig(String topic, String brokerName) { - ClusterInfo clusterInfo = null; - try { - clusterInfo = mqAdminExt.examineBrokerClusterInfo(); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - return mqAdminExt.examineTopicConfig(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), topic); - } - - @Override - public List<TopicConfigInfo> examineTopicConfig(String topic) { - List<TopicConfigInfo> topicConfigInfoList = Lists.newArrayList(); - TopicRouteData topicRouteData = route(topic); - for (BrokerData brokerData : topicRouteData.getBrokerDatas()) { - TopicConfigInfo topicConfigInfo = new TopicConfigInfo(); - TopicConfig topicConfig = examineTopicConfig(topic, brokerData.getBrokerName()); - BeanUtils.copyProperties(topicConfig, topicConfigInfo); - topicConfigInfo.setBrokerNameList(Lists.newArrayList(brokerData.getBrokerName())); - topicConfigInfoList.add(topicConfigInfo); - } - return topicConfigInfoList; - } - - @Override - public boolean deleteTopic(String topic, String clusterName) { - try { - if (StringUtils.isBlank(clusterName)) { - return deleteTopic(topic); - } - Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(mqAdminExt, clusterName); - mqAdminExt.deleteTopicInBroker(masterSet, topic); - Set<String> nameServerSet = null; - if (StringUtils.isNotBlank(rMQConfigure.getNamesrvAddr())) { - String[] ns = rMQConfigure.getNamesrvAddr().split(";"); - nameServerSet = new HashSet<String>(Arrays.asList(ns)); - } - mqAdminExt.deleteTopicInNameServer(nameServerSet, topic); - } - catch (Exception err) { - throw Throwables.propagate(err); - } - return true; - } - - @Override - public boolean deleteTopic(String topic) { - ClusterInfo clusterInfo = null; - try { - clusterInfo = mqAdminExt.examineBrokerClusterInfo(); - } - catch (Exception err) { - throw Throwables.propagate(err); - } - for (String clusterName : clusterInfo.getClusterAddrTable().keySet()) { - deleteTopic(topic, clusterName); - } - return true; - } - - @Override - public boolean deleteTopicInBroker(String brokerName, String topic) { - - try { - ClusterInfo clusterInfo = null; - try { - clusterInfo = mqAdminExt.examineBrokerClusterInfo(); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - mqAdminExt.deleteTopicInBroker(Sets.newHashSet(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr()), topic); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - return true; - } - - @Override - public SendResult sendTopicMessageRequest(SendTopicMessageRequest sendTopicMessageRequest) { - DefaultMQProducer producer = new DefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP); - producer.setInstanceName(String.valueOf(System.currentTimeMillis())); - producer.setNamesrvAddr(rMQConfigure.getNamesrvAddr()); - try { - producer.start(); - Message msg = new Message(sendTopicMessageRequest.getTopic(), - sendTopicMessageRequest.getTag(), - sendTopicMessageRequest.getKey(), - sendTopicMessageRequest.getMessageBody().getBytes() - ); - return producer.send(msg); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - finally { - producer.shutdown(); - } - } - -}
