http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/routeinfo/RouteInfoManager.java ---------------------------------------------------------------------- diff --git a/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/routeinfo/RouteInfoManager.java new file mode 100644 index 0000000..18450c6 --- /dev/null +++ b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/routeinfo/RouteInfoManager.java @@ -0,0 +1,815 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.namesrv.routeinfo; + +import com.alibaba.rocketmq.common.DataVersion; +import com.alibaba.rocketmq.common.MixAll; +import com.alibaba.rocketmq.common.TopicConfig; +import com.alibaba.rocketmq.common.constant.LoggerName; +import com.alibaba.rocketmq.common.constant.PermName; +import com.alibaba.rocketmq.common.namesrv.RegisterBrokerResult; +import com.alibaba.rocketmq.common.protocol.body.ClusterInfo; +import com.alibaba.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; +import com.alibaba.rocketmq.common.protocol.body.TopicList; +import com.alibaba.rocketmq.common.protocol.route.BrokerData; +import com.alibaba.rocketmq.common.protocol.route.QueueData; +import com.alibaba.rocketmq.common.protocol.route.TopicRouteData; +import com.alibaba.rocketmq.common.sysflag.TopicSysFlag; +import com.alibaba.rocketmq.remoting.common.RemotingUtil; +import io.netty.channel.Channel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + + +/** + * @author shijia.wxr + */ +public class RouteInfoManager { + private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); + private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2; + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + private final HashMap<String/* topic */, List<QueueData>> topicQueueTable; + private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable; + private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable; + private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; + private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable; + + + public RouteInfoManager() { + this.topicQueueTable = new HashMap<String, List<QueueData>>(1024); + this.brokerAddrTable = new HashMap<String, BrokerData>(128); + this.clusterAddrTable = new HashMap<String, Set<String>>(32); + this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256); + this.filterServerTable = new HashMap<String, List<String>>(256); + } + + public byte[] getAllClusterInfo() { + ClusterInfo clusterInfoSerializeWrapper = new ClusterInfo(); + clusterInfoSerializeWrapper.setBrokerAddrTable(this.brokerAddrTable); + clusterInfoSerializeWrapper.setClusterAddrTable(this.clusterAddrTable); + return clusterInfoSerializeWrapper.encode(); + } + + public void deleteTopic(final String topic) { + try { + try { + this.lock.writeLock().lockInterruptibly(); + this.topicQueueTable.remove(topic); + } finally { + this.lock.writeLock().unlock(); + } + } catch (Exception e) { + log.error("deleteTopic Exception", e); + } + } + + public byte[] getAllTopicList() { + TopicList topicList = new TopicList(); + try { + try { + this.lock.readLock().lockInterruptibly(); + topicList.getTopicList().addAll(this.topicQueueTable.keySet()); + } finally { + this.lock.readLock().unlock(); + } + } catch (Exception e) { + log.error("getAllTopicList Exception", e); + } + + return topicList.encode(); + } + + public RegisterBrokerResult registerBroker( + final String clusterName, + final String brokerAddr, + final String brokerName, + final long brokerId, + final String haServerAddr, + final TopicConfigSerializeWrapper topicConfigWrapper, + final List<String> filterServerList, + final Channel channel) { + RegisterBrokerResult result = new RegisterBrokerResult(); + try { + try { + this.lock.writeLock().lockInterruptibly(); + + + Set<String> brokerNames = this.clusterAddrTable.get(clusterName); + if (null == brokerNames) { + brokerNames = new HashSet<String>(); + this.clusterAddrTable.put(clusterName, brokerNames); + } + brokerNames.add(brokerName); + + boolean registerFirst = false; + + + BrokerData brokerData = this.brokerAddrTable.get(brokerName); + if (null == brokerData) { + registerFirst = true; + brokerData = new BrokerData(); + brokerData.setBrokerName(brokerName); + HashMap<Long, String> brokerAddrs = new HashMap<Long, String>(); + brokerData.setBrokerAddrs(brokerAddrs); + + this.brokerAddrTable.put(brokerName, brokerData); + } + String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr); + registerFirst = registerFirst || (null == oldAddr); + + + if (null != topicConfigWrapper // + && MixAll.MASTER_ID == brokerId) { + if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())// + || registerFirst) { + ConcurrentHashMap<String, TopicConfig> tcTable = + topicConfigWrapper.getTopicConfigTable(); + if (tcTable != null) { + for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) { + this.createAndUpdateQueueData(brokerName, entry.getValue()); + } + } + } + } + + + BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr, + new BrokerLiveInfo( + System.currentTimeMillis(), + topicConfigWrapper.getDataVersion(), + channel, + haServerAddr)); + if (null == prevBrokerLiveInfo) { + log.info("new broker registerd, {} HAServer: {}", brokerAddr, haServerAddr); + } + + + if (filterServerList != null) { + if (filterServerList.isEmpty()) { + this.filterServerTable.remove(brokerAddr); + } else { + this.filterServerTable.put(brokerAddr, filterServerList); + } + } + + + if (MixAll.MASTER_ID != brokerId) { + String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID); + if (masterAddr != null) { + BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr); + if (brokerLiveInfo != null) { + result.setHaServerAddr(brokerLiveInfo.getHaServerAddr()); + result.setMasterAddr(masterAddr); + } + } + } + } finally { + this.lock.writeLock().unlock(); + } + } catch (Exception e) { + log.error("registerBroker Exception", e); + } + + return result; + } + + private boolean isBrokerTopicConfigChanged(final String brokerAddr, final DataVersion dataVersion) { + BrokerLiveInfo prev = this.brokerLiveTable.get(brokerAddr); + if (null == prev || !prev.getDataVersion().equals(dataVersion)) { + return true; + } + + return false; + } + + private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) { + QueueData queueData = new QueueData(); + queueData.setBrokerName(brokerName); + queueData.setWriteQueueNums(topicConfig.getWriteQueueNums()); + queueData.setReadQueueNums(topicConfig.getReadQueueNums()); + queueData.setPerm(topicConfig.getPerm()); + queueData.setTopicSynFlag(topicConfig.getTopicSysFlag()); + + List<QueueData> queueDataList = this.topicQueueTable.get(topicConfig.getTopicName()); + if (null == queueDataList) { + queueDataList = new LinkedList<QueueData>(); + queueDataList.add(queueData); + this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList); + log.info("new topic registerd, {} {}", topicConfig.getTopicName(), queueData); + } else { + boolean addNewOne = true; + + Iterator<QueueData> it = queueDataList.iterator(); + while (it.hasNext()) { + QueueData qd = it.next(); + if (qd.getBrokerName().equals(brokerName)) { + if (qd.equals(queueData)) { + addNewOne = false; + } else { + log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), qd, + queueData); + it.remove(); + } + } + } + + if (addNewOne) { + queueDataList.add(queueData); + } + } + } + + public int wipeWritePermOfBrokerByLock(final String brokerName) { + try { + try { + this.lock.writeLock().lockInterruptibly(); + return wipeWritePermOfBroker(brokerName); + } finally { + this.lock.writeLock().unlock(); + } + } catch (Exception e) { + log.error("wipeWritePermOfBrokerByLock Exception", e); + } + + return 0; + } + + private int wipeWritePermOfBroker(final String brokerName) { + int wipeTopicCnt = 0; + Iterator<Entry<String, List<QueueData>>> itTopic = this.topicQueueTable.entrySet().iterator(); + while (itTopic.hasNext()) { + Entry<String, List<QueueData>> entry = itTopic.next(); + List<QueueData> qdList = entry.getValue(); + + Iterator<QueueData> it = qdList.iterator(); + while (it.hasNext()) { + QueueData qd = it.next(); + if (qd.getBrokerName().equals(brokerName)) { + int perm = qd.getPerm(); + perm &= ~PermName.PERM_WRITE; + qd.setPerm(perm); + wipeTopicCnt++; + } + } + } + + return wipeTopicCnt; + } + + public void unregisterBroker( + final String clusterName, + final String brokerAddr, + final String brokerName, + final long brokerId) { + try { + try { + this.lock.writeLock().lockInterruptibly(); + BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.remove(brokerAddr); + if (brokerLiveInfo != null) { + log.info("unregisterBroker, remove from brokerLiveTable {}, {}", + brokerLiveInfo != null ? "OK" : "Failed", + brokerAddr + ); + } + + this.filterServerTable.remove(brokerAddr); + + boolean removeBrokerName = false; + BrokerData brokerData = this.brokerAddrTable.get(brokerName); + if (null != brokerData) { + String addr = brokerData.getBrokerAddrs().remove(brokerId); + log.info("unregisterBroker, remove addr from brokerAddrTable {}, {}", + addr != null ? "OK" : "Failed", + brokerAddr + ); + + if (brokerData.getBrokerAddrs().isEmpty()) { + this.brokerAddrTable.remove(brokerName); + log.info("unregisterBroker, remove name from brokerAddrTable OK, {}", + brokerName + ); + + removeBrokerName = true; + } + } + + if (removeBrokerName) { + Set<String> nameSet = this.clusterAddrTable.get(clusterName); + if (nameSet != null) { + boolean removed = nameSet.remove(brokerName); + log.info("unregisterBroker, remove name from clusterAddrTable {}, {}", + removed ? "OK" : "Failed", + brokerName); + + if (nameSet.isEmpty()) { + this.clusterAddrTable.remove(clusterName); + log.info("unregisterBroker, remove cluster from clusterAddrTable {}", + clusterName + ); + } + } + this.removeTopicByBrokerName(brokerName); + } + } finally { + this.lock.writeLock().unlock(); + } + } catch (Exception e) { + log.error("unregisterBroker Exception", e); + } + } + + private void removeTopicByBrokerName(final String brokerName) { + Iterator<Entry<String, List<QueueData>>> itMap = this.topicQueueTable.entrySet().iterator(); + while (itMap.hasNext()) { + Entry<String, List<QueueData>> entry = itMap.next(); + + String topic = entry.getKey(); + List<QueueData> queueDataList = entry.getValue(); + Iterator<QueueData> it = queueDataList.iterator(); + while (it.hasNext()) { + QueueData qd = it.next(); + if (qd.getBrokerName().equals(brokerName)) { + log.info("removeTopicByBrokerName, remove one broker's topic {} {}", topic, qd); + it.remove(); + } + } + + if (queueDataList.isEmpty()) { + log.info("removeTopicByBrokerName, remove the topic all queue {}", topic); + itMap.remove(); + } + } + } + + public TopicRouteData pickupTopicRouteData(final String topic) { + TopicRouteData topicRouteData = new TopicRouteData(); + boolean foundQueueData = false; + boolean foundBrokerData = false; + Set<String> brokerNameSet = new HashSet<String>(); + List<BrokerData> brokerDataList = new LinkedList<BrokerData>(); + topicRouteData.setBrokerDatas(brokerDataList); + + HashMap<String, List<String>> filterServerMap = new HashMap<String, List<String>>(); + topicRouteData.setFilterServerTable(filterServerMap); + + try { + try { + this.lock.readLock().lockInterruptibly(); + List<QueueData> queueDataList = this.topicQueueTable.get(topic); + if (queueDataList != null) { + topicRouteData.setQueueDatas(queueDataList); + foundQueueData = true; + + + Iterator<QueueData> it = queueDataList.iterator(); + while (it.hasNext()) { + QueueData qd = it.next(); + brokerNameSet.add(qd.getBrokerName()); + } + + for (String brokerName : brokerNameSet) { + BrokerData brokerData = this.brokerAddrTable.get(brokerName); + if (null != brokerData) { + BrokerData brokerDataClone = new BrokerData(); + brokerDataClone.setBrokerName(brokerData.getBrokerName()); + brokerDataClone.setBrokerAddrs((HashMap<Long, String>) brokerData + .getBrokerAddrs().clone()); + brokerDataList.add(brokerDataClone); + foundBrokerData = true; + for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) { + List<String> filterServerList = this.filterServerTable.get(brokerAddr); + filterServerMap.put(brokerAddr, filterServerList); + } + } + } + } + } finally { + this.lock.readLock().unlock(); + } + } catch (Exception e) { + log.error("pickupTopicRouteData Exception", e); + } + + if (log.isDebugEnabled()) { + log.debug("pickupTopicRouteData {} {}", topic, topicRouteData); + } + + if (foundBrokerData && foundQueueData) { + return topicRouteData; + } + + return null; + } + + public void scanNotActiveBroker() { + Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<String, BrokerLiveInfo> next = it.next(); + long last = next.getValue().getLastUpdateTimestamp(); + if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) { + RemotingUtil.closeChannel(next.getValue().getChannel()); + it.remove(); + log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME); + this.onChannelDestroy(next.getKey(), next.getValue().getChannel()); + } + } + } + + public void onChannelDestroy(String remoteAddr, Channel channel) { + String brokerAddrFound = null; + if (channel != null) { + try { + try { + this.lock.readLock().lockInterruptibly(); + Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable = + this.brokerLiveTable.entrySet().iterator(); + while (itBrokerLiveTable.hasNext()) { + Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next(); + if (entry.getValue().getChannel() == channel) { + brokerAddrFound = entry.getKey(); + break; + } + } + } finally { + this.lock.readLock().unlock(); + } + } catch (Exception e) { + log.error("onChannelDestroy Exception", e); + } + } + + if (null == brokerAddrFound) { + brokerAddrFound = remoteAddr; + } else { + log.info("the broker's channel destroyed, {}, clean it's data structure at once", brokerAddrFound); + } + + + if (brokerAddrFound != null && brokerAddrFound.length() > 0) { + + try { + try { + this.lock.writeLock().lockInterruptibly(); + this.brokerLiveTable.remove(brokerAddrFound); + this.filterServerTable.remove(brokerAddrFound); + String brokerNameFound = null; + boolean removeBrokerName = false; + Iterator<Entry<String, BrokerData>> itBrokerAddrTable = + this.brokerAddrTable.entrySet().iterator(); + while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) { + BrokerData brokerData = itBrokerAddrTable.next().getValue(); + + Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator(); + while (it.hasNext()) { + Entry<Long, String> entry = it.next(); + Long brokerId = entry.getKey(); + String brokerAddr = entry.getValue(); + if (brokerAddr.equals(brokerAddrFound)) { + brokerNameFound = brokerData.getBrokerName(); + it.remove(); + log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed", + brokerId, brokerAddr); + break; + } + } + + if (brokerData.getBrokerAddrs().isEmpty()) { + removeBrokerName = true; + itBrokerAddrTable.remove(); + log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed", + brokerData.getBrokerName()); + } + } + + if (brokerNameFound != null && removeBrokerName) { + Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<String, Set<String>> entry = it.next(); + String clusterName = entry.getKey(); + Set<String> brokerNames = entry.getValue(); + boolean removed = brokerNames.remove(brokerNameFound); + if (removed) { + log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed", + brokerNameFound, clusterName); + + + if (brokerNames.isEmpty()) { + log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster", + clusterName); + it.remove(); + } + + break; + } + } + } + + if (removeBrokerName) { + Iterator<Entry<String, List<QueueData>>> itTopicQueueTable = + this.topicQueueTable.entrySet().iterator(); + while (itTopicQueueTable.hasNext()) { + Entry<String, List<QueueData>> entry = itTopicQueueTable.next(); + String topic = entry.getKey(); + List<QueueData> queueDataList = entry.getValue(); + + Iterator<QueueData> itQueueData = queueDataList.iterator(); + while (itQueueData.hasNext()) { + QueueData queueData = itQueueData.next(); + if (queueData.getBrokerName().equals(brokerNameFound)) { + itQueueData.remove(); + log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed", + topic, queueData); + } + } + + if (queueDataList.isEmpty()) { + itTopicQueueTable.remove(); + log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed", + topic); + } + } + } + } finally { + this.lock.writeLock().unlock(); + } + } catch (Exception e) { + log.error("onChannelDestroy Exception", e); + } + } + } + + public void printAllPeriodically() { + try { + try { + this.lock.readLock().lockInterruptibly(); + log.info("--------------------------------------------------------"); + { + log.info("topicQueueTable SIZE: {}", this.topicQueueTable.size()); + Iterator<Entry<String, List<QueueData>>> it = this.topicQueueTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<String, List<QueueData>> next = it.next(); + log.info("topicQueueTable Topic: {} {}", next.getKey(), next.getValue()); + } + } + + { + log.info("brokerAddrTable SIZE: {}", this.brokerAddrTable.size()); + Iterator<Entry<String, BrokerData>> it = this.brokerAddrTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<String, BrokerData> next = it.next(); + log.info("brokerAddrTable brokerName: {} {}", next.getKey(), next.getValue()); + } + } + + { + log.info("brokerLiveTable SIZE: {}", this.brokerLiveTable.size()); + Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<String, BrokerLiveInfo> next = it.next(); + log.info("brokerLiveTable brokerAddr: {} {}", next.getKey(), next.getValue()); + } + } + + { + log.info("clusterAddrTable SIZE: {}", this.clusterAddrTable.size()); + Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<String, Set<String>> next = it.next(); + log.info("clusterAddrTable clusterName: {} {}", next.getKey(), next.getValue()); + } + } + } finally { + this.lock.readLock().unlock(); + } + } catch (Exception e) { + log.error("printAllPeriodically Exception", e); + } + } + + + public byte[] getSystemTopicList() { + TopicList topicList = new TopicList(); + try { + try { + this.lock.readLock().lockInterruptibly(); + for (Map.Entry<String, Set<String>> entry : clusterAddrTable.entrySet()) { + topicList.getTopicList().add(entry.getKey()); + topicList.getTopicList().addAll(entry.getValue()); + } + + if (brokerAddrTable != null && !brokerAddrTable.isEmpty()) { + Iterator<String> it = brokerAddrTable.keySet().iterator(); + while (it.hasNext()) { + BrokerData bd = brokerAddrTable.get(it.next()); + HashMap<Long, String> brokerAddrs = bd.getBrokerAddrs(); + if (bd.getBrokerAddrs() != null && !bd.getBrokerAddrs().isEmpty()) { + Iterator<Long> it2 = brokerAddrs.keySet().iterator(); + topicList.setBrokerAddr(brokerAddrs.get(it2.next())); + break; + } + } + } + } finally { + this.lock.readLock().unlock(); + } + } catch (Exception e) { + log.error("getAllTopicList Exception", e); + } + + return topicList.encode(); + } + + public byte[] getTopicsByCluster(String cluster) { + TopicList topicList = new TopicList(); + try { + try { + this.lock.readLock().lockInterruptibly(); + Set<String> brokerNameSet = this.clusterAddrTable.get(cluster); + for (String brokerName : brokerNameSet) { + Iterator<Entry<String, List<QueueData>>> topicTableIt = + this.topicQueueTable.entrySet().iterator(); + while (topicTableIt.hasNext()) { + Entry<String, List<QueueData>> topicEntry = topicTableIt.next(); + String topic = topicEntry.getKey(); + List<QueueData> queueDatas = topicEntry.getValue(); + for (QueueData queueData : queueDatas) { + if (brokerName.equals(queueData.getBrokerName())) { + topicList.getTopicList().add(topic); + break; + } + } + } + } + } finally { + this.lock.readLock().unlock(); + } + } catch (Exception e) { + log.error("getAllTopicList Exception", e); + } + + return topicList.encode(); + } + + public byte[] getUnitTopics() { + TopicList topicList = new TopicList(); + try { + try { + this.lock.readLock().lockInterruptibly(); + Iterator<Entry<String, List<QueueData>>> topicTableIt = + this.topicQueueTable.entrySet().iterator(); + while (topicTableIt.hasNext()) { + Entry<String, List<QueueData>> topicEntry = topicTableIt.next(); + String topic = topicEntry.getKey(); + List<QueueData> queueDatas = topicEntry.getValue(); + if (queueDatas != null && queueDatas.size() > 0 + && TopicSysFlag.hasUnitFlag(queueDatas.get(0).getTopicSynFlag())) { + topicList.getTopicList().add(topic); + } + } + } finally { + this.lock.readLock().unlock(); + } + } catch (Exception e) { + log.error("getAllTopicList Exception", e); + } + + return topicList.encode(); + } + + public byte[] getHasUnitSubTopicList() { + TopicList topicList = new TopicList(); + try { + try { + this.lock.readLock().lockInterruptibly(); + Iterator<Entry<String, List<QueueData>>> topicTableIt = + this.topicQueueTable.entrySet().iterator(); + while (topicTableIt.hasNext()) { + Entry<String, List<QueueData>> topicEntry = topicTableIt.next(); + String topic = topicEntry.getKey(); + List<QueueData> queueDatas = topicEntry.getValue(); + if (queueDatas != null && queueDatas.size() > 0 + && TopicSysFlag.hasUnitSubFlag(queueDatas.get(0).getTopicSynFlag())) { + topicList.getTopicList().add(topic); + } + } + } finally { + this.lock.readLock().unlock(); + } + } catch (Exception e) { + log.error("getAllTopicList Exception", e); + } + + return topicList.encode(); + } + + public byte[] getHasUnitSubUnUnitTopicList() { + TopicList topicList = new TopicList(); + try { + try { + this.lock.readLock().lockInterruptibly(); + Iterator<Entry<String, List<QueueData>>> topicTableIt = + this.topicQueueTable.entrySet().iterator(); + while (topicTableIt.hasNext()) { + Entry<String, List<QueueData>> topicEntry = topicTableIt.next(); + String topic = topicEntry.getKey(); + List<QueueData> queueDatas = topicEntry.getValue(); + if (queueDatas != null && queueDatas.size() > 0 + && !TopicSysFlag.hasUnitFlag(queueDatas.get(0).getTopicSynFlag()) + && TopicSysFlag.hasUnitSubFlag(queueDatas.get(0).getTopicSynFlag())) { + topicList.getTopicList().add(topic); + } + } + } finally { + this.lock.readLock().unlock(); + } + } catch (Exception e) { + log.error("getAllTopicList Exception", e); + } + + return topicList.encode(); + } +} + + +class BrokerLiveInfo { + private long lastUpdateTimestamp; + private DataVersion dataVersion; + private Channel channel; + private String haServerAddr; + + + public BrokerLiveInfo(long lastUpdateTimestamp, DataVersion dataVersion, Channel channel, + String haServerAddr) { + this.lastUpdateTimestamp = lastUpdateTimestamp; + this.dataVersion = dataVersion; + this.channel = channel; + this.haServerAddr = haServerAddr; + } + + + public long getLastUpdateTimestamp() { + return lastUpdateTimestamp; + } + + + public void setLastUpdateTimestamp(long lastUpdateTimestamp) { + this.lastUpdateTimestamp = lastUpdateTimestamp; + } + + + public DataVersion getDataVersion() { + return dataVersion; + } + + + public void setDataVersion(DataVersion dataVersion) { + this.dataVersion = dataVersion; + } + + + public Channel getChannel() { + return channel; + } + + + public void setChannel(Channel channel) { + this.channel = channel; + } + + + public String getHaServerAddr() { + return haServerAddr; + } + + + public void setHaServerAddr(String haServerAddr) { + this.haServerAddr = haServerAddr; + } + + + @Override + public String toString() { + return "BrokerLiveInfo [lastUpdateTimestamp=" + lastUpdateTimestamp + ", dataVersion=" + dataVersion + + ", channel=" + channel + ", haServerAddr=" + haServerAddr + "]"; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/pom.xml ---------------------------------------------------------------------- diff --git a/rocketmq-remoting/pom.xml b/rocketmq-remoting/pom.xml new file mode 100644 index 0000000..b229597 --- /dev/null +++ b/rocketmq-remoting/pom.xml @@ -0,0 +1,51 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain producerGroup copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <groupId>com.alibaba.rocketmq</groupId> + <artifactId>rocketmq-all</artifactId> + <version>4.0.0-SNAPSHOT</version> + </parent> + + <modelVersion>4.0.0</modelVersion> + <packaging>jar</packaging> + <artifactId>rocketmq-remoting</artifactId> + <name>rocketmq-remoting ${project.version}</name> + + + <dependencies> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.alibaba</groupId> + <artifactId>fastjson</artifactId> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-all</artifactId> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/ChannelEventListener.java ---------------------------------------------------------------------- diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/ChannelEventListener.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/ChannelEventListener.java new file mode 100644 index 0000000..eff9551 --- /dev/null +++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/ChannelEventListener.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.remoting; + +import io.netty.channel.Channel; + + +/** + * @author shijia.wxr + * + */ +public interface ChannelEventListener { + void onChannelConnect(final String remoteAddr, final Channel channel); + + + void onChannelClose(final String remoteAddr, final Channel channel); + + + void onChannelException(final String remoteAddr, final Channel channel); + + + void onChannelIdle(final String remoteAddr, final Channel channel); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/CommandCustomHeader.java ---------------------------------------------------------------------- diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/CommandCustomHeader.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/CommandCustomHeader.java new file mode 100644 index 0000000..7e0bd8c --- /dev/null +++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/CommandCustomHeader.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.remoting; + +import com.alibaba.rocketmq.remoting.exception.RemotingCommandException; + + +/** + * @author shijia.wxr + */ +public interface CommandCustomHeader { + void checkFields() throws RemotingCommandException; +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/InvokeCallback.java ---------------------------------------------------------------------- diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/InvokeCallback.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/InvokeCallback.java new file mode 100644 index 0000000..6ba27e1 --- /dev/null +++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/InvokeCallback.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.remoting; + +import com.alibaba.rocketmq.remoting.netty.ResponseFuture; + + +/** + * @author shijia.wxr + * + */ +public interface InvokeCallback { + public void operationComplete(final ResponseFuture responseFuture); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/RPCHook.java ---------------------------------------------------------------------- diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/RPCHook.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/RPCHook.java new file mode 100644 index 0000000..cc2d594 --- /dev/null +++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/RPCHook.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.remoting; + +import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; + + +public interface RPCHook { + void doBeforeRequest(final String remoteAddr, final RemotingCommand request); + + + void doAfterResponse(final String remoteAddr, final RemotingCommand request, + final RemotingCommand response); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/RemotingClient.java ---------------------------------------------------------------------- diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/RemotingClient.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/RemotingClient.java new file mode 100644 index 0000000..ad8c0be --- /dev/null +++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/RemotingClient.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.remoting; + +import com.alibaba.rocketmq.remoting.exception.RemotingConnectException; +import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException; +import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException; +import com.alibaba.rocketmq.remoting.exception.RemotingTooMuchRequestException; +import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor; +import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; + +import java.util.List; +import java.util.concurrent.ExecutorService; + + +/** + * @author shijia.wxr + * + */ +public interface RemotingClient extends RemotingService { + + public void updateNameServerAddressList(final List<String> addrs); + + + public List<String> getNameServerAddressList(); + + + public RemotingCommand invokeSync(final String addr, final RemotingCommand request, + final long timeoutMillis) throws InterruptedException, RemotingConnectException, + RemotingSendRequestException, RemotingTimeoutException; + + + public void invokeAsync(final String addr, final RemotingCommand request, final long timeoutMillis, + final InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException, + RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException; + + + public void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis) + throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, + RemotingTimeoutException, RemotingSendRequestException; + + + public void registerProcessor(final int requestCode, final NettyRequestProcessor processor, + final ExecutorService executor); + + + public boolean isChannelWriteable(final String addr); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/RemotingServer.java ---------------------------------------------------------------------- diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/RemotingServer.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/RemotingServer.java new file mode 100644 index 0000000..ae84c1b --- /dev/null +++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/RemotingServer.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.remoting; + +import com.alibaba.rocketmq.remoting.common.Pair; +import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException; +import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException; +import com.alibaba.rocketmq.remoting.exception.RemotingTooMuchRequestException; +import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor; +import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; +import io.netty.channel.Channel; + +import java.util.concurrent.ExecutorService; + + +/** + * @author shijia.wxr + * + */ +public interface RemotingServer extends RemotingService { + + void registerProcessor(final int requestCode, final NettyRequestProcessor processor, + final ExecutorService executor); + + + void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor); + + + int localListenPort(); + + + Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode); + + + RemotingCommand invokeSync(final Channel channel, final RemotingCommand request, + final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, + RemotingTimeoutException; + + + void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis, + final InvokeCallback invokeCallback) throws InterruptedException, + RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException; + + + void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis) + throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, + RemotingSendRequestException; + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/RemotingService.java ---------------------------------------------------------------------- diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/RemotingService.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/RemotingService.java new file mode 100644 index 0000000..cddac3e --- /dev/null +++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/RemotingService.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.remoting; + +public interface RemotingService { + void start(); + + + void shutdown(); + + + void registerRPCHook(RPCHook rpcHook); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/annotation/CFNotNull.java ---------------------------------------------------------------------- diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/annotation/CFNotNull.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/annotation/CFNotNull.java new file mode 100644 index 0000000..4ca077d --- /dev/null +++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/annotation/CFNotNull.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.remoting.annotation; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * @author shijia.wxr + */ +@Documented +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.FIELD, ElementType.METHOD, ElementType.PARAMETER, ElementType.LOCAL_VARIABLE}) +public @interface CFNotNull { +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/annotation/CFNullable.java ---------------------------------------------------------------------- diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/annotation/CFNullable.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/annotation/CFNullable.java new file mode 100644 index 0000000..1318854 --- /dev/null +++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/annotation/CFNullable.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.remoting.annotation; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * @author shijia.wxr + */ +@Documented +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.FIELD, ElementType.METHOD, ElementType.PARAMETER, ElementType.LOCAL_VARIABLE}) +public @interface CFNullable { +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/Pair.java ---------------------------------------------------------------------- diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/Pair.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/Pair.java new file mode 100644 index 0000000..091224e --- /dev/null +++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/Pair.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.remoting.common; + +/** + * @author shijia.wxr + */ +public class Pair<T1, T2> { + private T1 object1; + private T2 object2; + + + public Pair(T1 object1, T2 object2) { + this.object1 = object1; + this.object2 = object2; + } + + + public T1 getObject1() { + return object1; + } + + + public void setObject1(T1 object1) { + this.object1 = object1; + } + + + public T2 getObject2() { + return object2; + } + + + public void setObject2(T2 object2) { + this.object2 = object2; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/RemotingHelper.java ---------------------------------------------------------------------- diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/RemotingHelper.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/RemotingHelper.java new file mode 100644 index 0000000..9dcdd83 --- /dev/null +++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/RemotingHelper.java @@ -0,0 +1,212 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.remoting.common; + +import com.alibaba.rocketmq.remoting.exception.RemotingConnectException; +import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException; +import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException; +import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; +import io.netty.channel.Channel; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; + + +/** + * @author shijia.wxr + */ +public class RemotingHelper { + public static final String ROCKETMQ_REMOTING = "RocketmqRemoting"; + public static final String DEFAULT_CHARSET = "UTF-8"; + + public static String exceptionSimpleDesc(final Throwable e) { + StringBuffer sb = new StringBuffer(); + if (e != null) { + sb.append(e.toString()); + + StackTraceElement[] stackTrace = e.getStackTrace(); + if (stackTrace != null && stackTrace.length > 0) { + StackTraceElement elment = stackTrace[0]; + sb.append(", "); + sb.append(elment.toString()); + } + } + + return sb.toString(); + } + + public static SocketAddress string2SocketAddress(final String addr) { + String[] s = addr.split(":"); + InetSocketAddress isa = new InetSocketAddress(s[0], Integer.parseInt(s[1])); + return isa; + } + + public static RemotingCommand invokeSync(final String addr, final RemotingCommand request, + final long timeoutMillis) throws InterruptedException, RemotingConnectException, + RemotingSendRequestException, RemotingTimeoutException { + long beginTime = System.currentTimeMillis(); + SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr); + SocketChannel socketChannel = RemotingUtil.connect(socketAddress); + if (socketChannel != null) { + boolean sendRequestOK = false; + + try { + + socketChannel.configureBlocking(true); + + //bugfix http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4614802 + socketChannel.socket().setSoTimeout((int) timeoutMillis); + + + ByteBuffer byteBufferRequest = request.encode(); + while (byteBufferRequest.hasRemaining()) { + int length = socketChannel.write(byteBufferRequest); + if (length > 0) { + if (byteBufferRequest.hasRemaining()) { + if ((System.currentTimeMillis() - beginTime) > timeoutMillis) { + + throw new RemotingSendRequestException(addr); + } + } + } else { + throw new RemotingSendRequestException(addr); + } + + + Thread.sleep(1); + } + + sendRequestOK = true; + + ByteBuffer byteBufferSize = ByteBuffer.allocate(4); + while (byteBufferSize.hasRemaining()) { + int length = socketChannel.read(byteBufferSize); + if (length > 0) { + if (byteBufferSize.hasRemaining()) { + if ((System.currentTimeMillis() - beginTime) > timeoutMillis) { + + throw new RemotingTimeoutException(addr, timeoutMillis); + } + } + } else { + throw new RemotingTimeoutException(addr, timeoutMillis); + } + + + Thread.sleep(1); + } + + int size = byteBufferSize.getInt(0); + ByteBuffer byteBufferBody = ByteBuffer.allocate(size); + while (byteBufferBody.hasRemaining()) { + int length = socketChannel.read(byteBufferBody); + if (length > 0) { + if (byteBufferBody.hasRemaining()) { + if ((System.currentTimeMillis() - beginTime) > timeoutMillis) { + + throw new RemotingTimeoutException(addr, timeoutMillis); + } + } + } else { + throw new RemotingTimeoutException(addr, timeoutMillis); + } + + + Thread.sleep(1); + } + + + byteBufferBody.flip(); + return RemotingCommand.decode(byteBufferBody); + } catch (IOException e) { + e.printStackTrace(); + + if (sendRequestOK) { + throw new RemotingTimeoutException(addr, timeoutMillis); + } else { + throw new RemotingSendRequestException(addr); + } + } finally { + try { + socketChannel.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } else { + throw new RemotingConnectException(addr); + } + } + + + public static String parseChannelRemoteAddr(final Channel channel) { + if (null == channel) { + return ""; + } + SocketAddress remote = channel.remoteAddress(); + final String addr = remote != null ? remote.toString() : ""; + + if (addr.length() > 0) { + int index = addr.lastIndexOf("/"); + if (index >= 0) { + return addr.substring(index + 1); + } + + return addr; + } + + return ""; + } + + + public static String parseChannelRemoteName(final Channel channel) { + if (null == channel) { + return ""; + } + final InetSocketAddress remote = (InetSocketAddress) channel.remoteAddress(); + if (remote != null) { + return remote.getAddress().getHostName(); + } + return ""; + } + + + public static String parseSocketAddressAddr(SocketAddress socketAddress) { + if (socketAddress != null) { + final String addr = socketAddress.toString(); + + if (addr.length() > 0) { + return addr.substring(1); + } + } + return ""; + } + + + public static String parseSocketAddressName(SocketAddress socketAddress) { + + final InetSocketAddress addrs = (InetSocketAddress) socketAddress; + if (addrs != null) { + return addrs.getAddress().getHostName(); + } + return ""; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/RemotingUtil.java ---------------------------------------------------------------------- diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/RemotingUtil.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/RemotingUtil.java new file mode 100644 index 0000000..af2348f --- /dev/null +++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/RemotingUtil.java @@ -0,0 +1,215 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.remoting.common; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.net.Inet6Address; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.NetworkInterface; +import java.net.SocketAddress; +import java.net.SocketException; +import java.net.UnknownHostException; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.nio.channels.spi.SelectorProvider; +import java.util.ArrayList; +import java.util.Enumeration; + + +/** + * @author shijia.wxr + */ +public class RemotingUtil { + public static final String OS_NAME = System.getProperty("os.name"); + + private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); + private static boolean isLinuxPlatform = false; + private static boolean isWindowsPlatform = false; + + static { + if (OS_NAME != null && OS_NAME.toLowerCase().indexOf("linux") >= 0) { + isLinuxPlatform = true; + } + + if (OS_NAME != null && OS_NAME.toLowerCase().indexOf("windows") >= 0) { + isWindowsPlatform = true; + } + } + + public static boolean isWindowsPlatform() { + return isWindowsPlatform; + } + + public static Selector openSelector() throws IOException { + Selector result = null; + + if (isLinuxPlatform()) { + try { + final Class<?> providerClazz = Class.forName("sun.nio.ch.EPollSelectorProvider"); + if (providerClazz != null) { + try { + final Method method = providerClazz.getMethod("provider"); + if (method != null) { + final SelectorProvider selectorProvider = (SelectorProvider) method.invoke(null); + if (selectorProvider != null) { + result = selectorProvider.openSelector(); + } + } + } catch (final Exception e) { + log.warn("Open ePoll Selector for linux platform exception", e); + } + } + } catch (final Exception e) { + // ignore + } + } + + if (result == null) { + result = Selector.open(); + } + + return result; + } + + public static boolean isLinuxPlatform() { + return isLinuxPlatform; + } + + public static String getLocalAddress() { + try { + // Traversal Network interface to get the first non-loopback and non-private address + Enumeration<NetworkInterface> enumeration = NetworkInterface.getNetworkInterfaces(); + ArrayList<String> ipv4Result = new ArrayList<String>(); + ArrayList<String> ipv6Result = new ArrayList<String>(); + while (enumeration.hasMoreElements()) { + final NetworkInterface networkInterface = enumeration.nextElement(); + final Enumeration<InetAddress> en = networkInterface.getInetAddresses(); + while (en.hasMoreElements()) { + final InetAddress address = en.nextElement(); + if (!address.isLoopbackAddress()) { + if (address instanceof Inet6Address) { + ipv6Result.add(normalizeHostAddress(address)); + } else { + ipv4Result.add(normalizeHostAddress(address)); + } + } + } + } + + // prefer ipv4 + if (!ipv4Result.isEmpty()) { + for (String ip : ipv4Result) { + if (ip.startsWith("127.0") || ip.startsWith("192.168")) { + continue; + } + + return ip; + } + + return ipv4Result.get(ipv4Result.size() - 1); + } else if (!ipv6Result.isEmpty()) { + return ipv6Result.get(0); + } + //If failed to find,fall back to localhost + final InetAddress localHost = InetAddress.getLocalHost(); + return normalizeHostAddress(localHost); + } catch (SocketException e) { + e.printStackTrace(); + } catch (UnknownHostException e) { + e.printStackTrace(); + } + + return null; + } + + + public static String normalizeHostAddress(final InetAddress localHost) { + if (localHost instanceof Inet6Address) { + return "[" + localHost.getHostAddress() + "]"; + } else { + return localHost.getHostAddress(); + } + } + + public static SocketAddress string2SocketAddress(final String addr) { + String[] s = addr.split(":"); + InetSocketAddress isa = new InetSocketAddress(s[0], Integer.parseInt(s[1])); + return isa; + } + + + public static String socketAddress2String(final SocketAddress addr) { + StringBuilder sb = new StringBuilder(); + InetSocketAddress inetSocketAddress = (InetSocketAddress) addr; + sb.append(inetSocketAddress.getAddress().getHostAddress()); + sb.append(":"); + sb.append(inetSocketAddress.getPort()); + return sb.toString(); + } + + + public static SocketChannel connect(SocketAddress remote) { + return connect(remote, 1000 * 5); + } + + + public static SocketChannel connect(SocketAddress remote, final int timeoutMillis) { + SocketChannel sc = null; + try { + sc = SocketChannel.open(); + sc.configureBlocking(true); + sc.socket().setSoLinger(false, -1); + sc.socket().setTcpNoDelay(true); + sc.socket().setReceiveBufferSize(1024 * 64); + sc.socket().setSendBufferSize(1024 * 64); + sc.socket().connect(remote, timeoutMillis); + sc.configureBlocking(false); + return sc; + } catch (Exception e) { + if (sc != null) { + try { + sc.close(); + } catch (IOException e1) { + e1.printStackTrace(); + } + } + } + + return null; + } + + + public static void closeChannel(Channel channel) { + final String addrRemote = RemotingHelper.parseChannelRemoteAddr(channel); + channel.close().addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + log.info("closeChannel: close the connection to remote address[{}] result: {}", addrRemote, + future.isSuccess()); + } + }); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/SemaphoreReleaseOnlyOnce.java ---------------------------------------------------------------------- diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/SemaphoreReleaseOnlyOnce.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/SemaphoreReleaseOnlyOnce.java new file mode 100644 index 0000000..c24e8b3 --- /dev/null +++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/SemaphoreReleaseOnlyOnce.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.remoting.common; + +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; + + +/** + * @author shijia.wxr + */ +public class SemaphoreReleaseOnlyOnce { + private final AtomicBoolean released = new AtomicBoolean(false); + private final Semaphore semaphore; + + + public SemaphoreReleaseOnlyOnce(Semaphore semaphore) { + this.semaphore = semaphore; + } + + + public void release() { + if (this.semaphore != null) { + if (this.released.compareAndSet(false, true)) { + this.semaphore.release(); + } + } + } + + + public Semaphore getSemaphore() { + return semaphore; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/ServiceThread.java ---------------------------------------------------------------------- diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/ServiceThread.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/ServiceThread.java new file mode 100644 index 0000000..365c670 --- /dev/null +++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/ServiceThread.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.remoting.common; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Base class for background thread + * + * @author shijia.wxr + * + */ +public abstract class ServiceThread implements Runnable { + private static final Logger STLOG = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); + private static final long JOIN_TIME = 90 * 1000; + protected final Thread thread; + protected volatile boolean hasNotified = false; + protected volatile boolean stopped = false; + + + public ServiceThread() { + this.thread = new Thread(this, this.getServiceName()); + } + + + public abstract String getServiceName(); + + + public void start() { + this.thread.start(); + } + + + public void shutdown() { + this.shutdown(false); + } + + public void shutdown(final boolean interrupt) { + this.stopped = true; + STLOG.info("shutdown thread " + this.getServiceName() + " interrupt " + interrupt); + synchronized (this) { + if (!this.hasNotified) { + this.hasNotified = true; + this.notify(); + } + } + + try { + if (interrupt) { + this.thread.interrupt(); + } + + long beginTime = System.currentTimeMillis(); + this.thread.join(this.getJointime()); + long eclipseTime = System.currentTimeMillis() - beginTime; + STLOG.info("join thread " + this.getServiceName() + " eclipse time(ms) " + eclipseTime + " " + + this.getJointime()); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + public long getJointime() { + return JOIN_TIME; + } + + public void stop() { + this.stop(false); + } + + public void stop(final boolean interrupt) { + this.stopped = true; + STLOG.info("stop thread " + this.getServiceName() + " interrupt " + interrupt); + synchronized (this) { + if (!this.hasNotified) { + this.hasNotified = true; + this.notify(); + } + } + + if (interrupt) { + this.thread.interrupt(); + } + } + + public void makeStop() { + this.stopped = true; + STLOG.info("makestop thread " + this.getServiceName()); + } + + public void wakeup() { + synchronized (this) { + if (!this.hasNotified) { + this.hasNotified = true; + this.notify(); + } + } + } + + protected void waitForRunning(long interval) { + synchronized (this) { + if (this.hasNotified) { + this.hasNotified = false; + this.onWaitEnd(); + return; + } + + try { + this.wait(interval); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + this.hasNotified = false; + this.onWaitEnd(); + } + } + } + + protected void onWaitEnd() { + } + + public boolean isStopped() { + return stopped; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingCommandException.java ---------------------------------------------------------------------- diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingCommandException.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingCommandException.java new file mode 100644 index 0000000..fe5cab9 --- /dev/null +++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingCommandException.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.remoting.exception; + +/** + * @author shijia.wxr + */ +public class RemotingCommandException extends RemotingException { + private static final long serialVersionUID = -6061365915274953096L; + + + public RemotingCommandException(String message) { + super(message, null); + } + + + public RemotingCommandException(String message, Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingConnectException.java ---------------------------------------------------------------------- diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingConnectException.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingConnectException.java new file mode 100644 index 0000000..5c546bd --- /dev/null +++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingConnectException.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.remoting.exception; + +/** + * @author shijia.wxr + */ +public class RemotingConnectException extends RemotingException { + private static final long serialVersionUID = -5565366231695911316L; + + + public RemotingConnectException(String addr) { + this(addr, null); + } + + + public RemotingConnectException(String addr, Throwable cause) { + super("connect to <" + addr + "> failed", cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingException.java ---------------------------------------------------------------------- diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingException.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingException.java new file mode 100644 index 0000000..2c4b672 --- /dev/null +++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingException.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.remoting.exception; + +/** + * @author shijia.wxr + */ +public class RemotingException extends Exception { + private static final long serialVersionUID = -5690687334570505110L; + + + public RemotingException(String message) { + super(message); + } + + + public RemotingException(String message, Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingSendRequestException.java ---------------------------------------------------------------------- diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingSendRequestException.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingSendRequestException.java new file mode 100644 index 0000000..e29e1a2 --- /dev/null +++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingSendRequestException.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.remoting.exception; + +/** + * @author shijia.wxr + */ +public class RemotingSendRequestException extends RemotingException { + private static final long serialVersionUID = 5391285827332471674L; + + + public RemotingSendRequestException(String addr) { + this(addr, null); + } + + + public RemotingSendRequestException(String addr, Throwable cause) { + super("send request to <" + addr + "> failed", cause); + } +}
