http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java new file mode 100644 index 0000000..1749e91 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java @@ -0,0 +1,266 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker; + +import ch.qos.logback.classic.LoggerContext; +import ch.qos.logback.classic.joran.JoranConfigurator; +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.MQVersion; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.remoting.common.RemotingUtil; +import org.apache.rocketmq.remoting.netty.NettyClientConfig; +import org.apache.rocketmq.remoting.netty.NettyServerConfig; +import org.apache.rocketmq.remoting.netty.NettySystemConfig; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.store.config.BrokerRole; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedInputStream; +import java.io.FileInputStream; +import java.io.InputStream; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; + + +/** + * @author shijia.wxr + */ +public class BrokerStartup { + public static Properties properties = null; + public static CommandLine commandLine = null; + public static String configFile = null; + public static Logger log; + + public static void main(String[] args) { + start(createBrokerController(args)); + } + + public static BrokerController start(BrokerController controller) { + try { + controller.start(); + String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", " + + controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer(); + + if (null != controller.getBrokerConfig().getNamesrvAddr()) { + tip += " and name server is " + controller.getBrokerConfig().getNamesrvAddr(); + } + + log.info(tip); + return controller; + } catch (Throwable e) { + e.printStackTrace(); + System.exit(-1); + } + + return null; + } + + public static BrokerController createBrokerController(String[] args) { + System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); + + if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) { + NettySystemConfig.socketSndbufSize = 131072; + } + + if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) { + NettySystemConfig.socketRcvbufSize = 131072; + } + + try { + //PackageConflictDetect.detectFastjson(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options), + new PosixParser()); + if (null == commandLine) { + System.exit(-1); + } + + final BrokerConfig brokerConfig = new BrokerConfig(); + final NettyServerConfig nettyServerConfig = new NettyServerConfig(); + final NettyClientConfig nettyClientConfig = new NettyClientConfig(); + nettyServerConfig.setListenPort(10911); + final MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); + + if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) { + int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10; + messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio); + } + + if (commandLine.hasOption('p')) { + MixAll.printObjectProperties(null, brokerConfig); + MixAll.printObjectProperties(null, nettyServerConfig); + MixAll.printObjectProperties(null, nettyClientConfig); + MixAll.printObjectProperties(null, messageStoreConfig); + System.exit(0); + } else if (commandLine.hasOption('m')) { + MixAll.printObjectProperties(null, brokerConfig, true); + MixAll.printObjectProperties(null, nettyServerConfig, true); + MixAll.printObjectProperties(null, nettyClientConfig, true); + MixAll.printObjectProperties(null, messageStoreConfig, true); + System.exit(0); + } + + if (commandLine.hasOption('c')) { + String file = commandLine.getOptionValue('c'); + if (file != null) { + configFile = file; + InputStream in = new BufferedInputStream(new FileInputStream(file)); + properties = new Properties(); + properties.load(in); + + parsePropertie2SystemEnv(properties); + MixAll.properties2Object(properties, brokerConfig); + MixAll.properties2Object(properties, nettyServerConfig); + MixAll.properties2Object(properties, nettyClientConfig); + MixAll.properties2Object(properties, messageStoreConfig); + + BrokerPathConfigHelper.setBrokerConfigPath(file); + in.close(); + } + } + + MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig); + + if (null == brokerConfig.getRocketmqHome()) { + System.out.printf("Please set the " + MixAll.ROCKETMQ_HOME_ENV + + " variable in your environment to match the location of the RocketMQ installation"); + System.exit(-2); + } + + String namesrvAddr = brokerConfig.getNamesrvAddr(); + if (null != namesrvAddr) { + try { + String[] addrArray = namesrvAddr.split(";"); + if (addrArray != null) { + for (String addr : addrArray) { + RemotingUtil.string2SocketAddress(addr); + } + } + } catch (Exception e) { + System.out.printf( + "The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n", + namesrvAddr); + System.exit(-3); + } + } + + + switch (messageStoreConfig.getBrokerRole()) { + case ASYNC_MASTER: + case SYNC_MASTER: + brokerConfig.setBrokerId(MixAll.MASTER_ID); + break; + case SLAVE: + if (brokerConfig.getBrokerId() <= 0) { + System.out.printf("Slave's brokerId must be > 0"); + System.exit(-3); + } + + break; + default: + break; + } + + messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1); + LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); + JoranConfigurator configurator = new JoranConfigurator(); + configurator.setContext(lc); + lc.reset(); + configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml"); + log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + + MixAll.printObjectProperties(log, brokerConfig); + MixAll.printObjectProperties(log, nettyServerConfig); + MixAll.printObjectProperties(log, nettyClientConfig); + MixAll.printObjectProperties(log, messageStoreConfig); + + final BrokerController controller = new BrokerController(// + brokerConfig, // + nettyServerConfig, // + nettyClientConfig, // + messageStoreConfig); + // remember all configs to prevent discard + controller.getConfiguration().registerConfig(properties); + + boolean initResult = controller.initialize(); + if (!initResult) { + controller.shutdown(); + System.exit(-3); + } + + Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { + private volatile boolean hasShutdown = false; + private AtomicInteger shutdownTimes = new AtomicInteger(0); + + @Override + public void run() { + synchronized (this) { + log.info("shutdown hook was invoked, " + this.shutdownTimes.incrementAndGet()); + if (!this.hasShutdown) { + this.hasShutdown = true; + long begineTime = System.currentTimeMillis(); + controller.shutdown(); + long consumingTimeTotal = System.currentTimeMillis() - begineTime; + log.info("shutdown hook over, consuming time total(ms): " + consumingTimeTotal); + } + } + } + }, "ShutdownHook")); + + return controller; + } catch (Throwable e) { + e.printStackTrace(); + System.exit(-1); + } + + return null; + } + + private static void parsePropertie2SystemEnv(Properties properties) { + if (properties == null) { + return; + } + String rmqAddressServerDomain = properties.getProperty("rmqAddressServerDomain", "jmenv.tbsite.net"); + String rmqAddressServerSubGroup = properties.getProperty("rmqAddressServerSubGroup", "nsaddr"); + System.setProperty("rocketmq.namesrv.domain", rmqAddressServerDomain); + System.setProperty("rocketmq.namesrv.domain.subgroup", rmqAddressServerSubGroup); + } + + public static Options buildCommandlineOptions(final Options options) { + Option opt = new Option("c", "configFile", true, "Broker config properties file"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("p", "printConfigItem", false, "Print all config item"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("m", "printImportantConfig", false, "Print important config item"); + opt.setRequired(false); + options.addOption(opt); + + return options; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/client/ClientChannelInfo.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientChannelInfo.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientChannelInfo.java new file mode 100644 index 0000000..e15a22a --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientChannelInfo.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.client; + +import org.apache.rocketmq.remoting.protocol.LanguageCode; +import io.netty.channel.Channel; + + +/** + * @author shijia.wxr + */ +public class ClientChannelInfo { + private final Channel channel; + private final String clientId; + private final LanguageCode language; + private final int version; + private volatile long lastUpdateTimestamp = System.currentTimeMillis(); + + + public ClientChannelInfo(Channel channel) { + this(channel, null, null, 0); + } + + + public ClientChannelInfo(Channel channel, String clientId, LanguageCode language, int version) { + this.channel = channel; + this.clientId = clientId; + this.language = language; + this.version = version; + } + + + public Channel getChannel() { + return channel; + } + + + public String getClientId() { + return clientId; + } + + + public LanguageCode getLanguage() { + return language; + } + + + public int getVersion() { + return version; + } + + + public long getLastUpdateTimestamp() { + return lastUpdateTimestamp; + } + + + public void setLastUpdateTimestamp(long lastUpdateTimestamp) { + this.lastUpdateTimestamp = lastUpdateTimestamp; + } + + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((channel == null) ? 0 : channel.hashCode()); + result = prime * result + ((clientId == null) ? 0 : clientId.hashCode()); + result = prime * result + ((language == null) ? 0 : language.hashCode()); + result = prime * result + (int) (lastUpdateTimestamp ^ (lastUpdateTimestamp >>> 32)); + result = prime * result + version; + return result; + } + + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + ClientChannelInfo other = (ClientChannelInfo) obj; + if (channel == null) { + if (other.channel != null) + return false; + } else if (this.channel != other.channel) { + return false; + } + + return true; + } + + + @Override + public String toString() { + return "ClientChannelInfo [channel=" + channel + ", clientId=" + clientId + ", language=" + language + + ", version=" + version + ", lastUpdateTimestamp=" + lastUpdateTimestamp + "]"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java new file mode 100644 index 0000000..2d1ad9b --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.client; + +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.common.ThreadFactoryImpl; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.remoting.ChannelEventListener; +import io.netty.channel.Channel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + + +/** + * @author shijia.wxr + */ +public class ClientHousekeepingService implements ChannelEventListener { + private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + private final BrokerController brokerController; + + private ScheduledExecutorService scheduledExecutorService = Executors + .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ClientHousekeepingScheduledThread")); + + + public ClientHousekeepingService(final BrokerController brokerController) { + this.brokerController = brokerController; + } + + + public void start() { + + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + ClientHousekeepingService.this.scanExceptionChannel(); + } catch (Exception e) { + log.error("", e); + } + } + }, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS); + } + + private void scanExceptionChannel() { + this.brokerController.getProducerManager().scanNotActiveChannel(); + this.brokerController.getConsumerManager().scanNotActiveChannel(); + this.brokerController.getFilterServerManager().scanNotActiveChannel(); + } + + public void shutdown() { + this.scheduledExecutorService.shutdown(); + } + + @Override + public void onChannelConnect(String remoteAddr, Channel channel) { + + } + + + @Override + public void onChannelClose(String remoteAddr, Channel channel) { + this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel); + this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel); + this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel); + } + + + @Override + public void onChannelException(String remoteAddr, Channel channel) { + this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel); + this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel); + this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel); + } + + + @Override + public void onChannelIdle(String remoteAddr, Channel channel) { + this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel); + this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel); + this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java new file mode 100644 index 0000000..10795f5 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java @@ -0,0 +1,273 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.client; + +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import io.netty.channel.Channel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + + +/** + * @author shijia.wxr + */ +public class ConsumerGroupInfo { + private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + private final String groupName; + private final ConcurrentHashMap<String/* Topic */, SubscriptionData> subscriptionTable = + new ConcurrentHashMap<String, SubscriptionData>(); + private final ConcurrentHashMap<Channel, ClientChannelInfo> channelInfoTable = + new ConcurrentHashMap<Channel, ClientChannelInfo>(16); + private volatile ConsumeType consumeType; + private volatile MessageModel messageModel; + private volatile ConsumeFromWhere consumeFromWhere; + private volatile long lastUpdateTimestamp = System.currentTimeMillis(); + + + public ConsumerGroupInfo(String groupName, ConsumeType consumeType, MessageModel messageModel, + ConsumeFromWhere consumeFromWhere) { + this.groupName = groupName; + this.consumeType = consumeType; + this.messageModel = messageModel; + this.consumeFromWhere = consumeFromWhere; + } + + + public ClientChannelInfo findChannel(final String clientId) { + Iterator<Entry<Channel, ClientChannelInfo>> it = this.channelInfoTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<Channel, ClientChannelInfo> next = it.next(); + if (next.getValue().getClientId().equals(clientId)) { + return next.getValue(); + } + } + + return null; + } + + + public ConcurrentHashMap<String, SubscriptionData> getSubscriptionTable() { + return subscriptionTable; + } + + + public ConcurrentHashMap<Channel, ClientChannelInfo> getChannelInfoTable() { + return channelInfoTable; + } + + + public List<Channel> getAllChannel() { + List<Channel> result = new ArrayList<Channel>(); + + result.addAll(this.channelInfoTable.keySet()); + + return result; + } + + + public List<String> getAllClientId() { + List<String> result = new ArrayList<String>(); + + Iterator<Entry<Channel, ClientChannelInfo>> it = this.channelInfoTable.entrySet().iterator(); + + while (it.hasNext()) { + Entry<Channel, ClientChannelInfo> entry = it.next(); + ClientChannelInfo clientChannelInfo = entry.getValue(); + result.add(clientChannelInfo.getClientId()); + } + + return result; + } + + + public void unregisterChannel(final ClientChannelInfo clientChannelInfo) { + ClientChannelInfo old = this.channelInfoTable.remove(clientChannelInfo.getChannel()); + if (old != null) { + log.info("unregister a consumer[{}] from consumerGroupInfo {}", this.groupName, old.toString()); + } + } + + + public boolean doChannelCloseEvent(final String remoteAddr, final Channel channel) { + final ClientChannelInfo info = this.channelInfoTable.remove(channel); + if (info != null) { + log.warn( + "NETTY EVENT: remove not active channel[{}] from ConsumerGroupInfo groupChannelTable, consumer group: {}", + info.toString(), groupName); + return true; + } + + return false; + } + + public boolean updateChannel(final ClientChannelInfo infoNew, ConsumeType consumeType, + MessageModel messageModel, ConsumeFromWhere consumeFromWhere) { + boolean updated = false; + this.consumeType = consumeType; + this.messageModel = messageModel; + this.consumeFromWhere = consumeFromWhere; + + ClientChannelInfo infoOld = this.channelInfoTable.get(infoNew.getChannel()); + if (null == infoOld) { + ClientChannelInfo prev = this.channelInfoTable.put(infoNew.getChannel(), infoNew); + if (null == prev) { + log.info("new consumer connected, group: {} {} {} channel: {}", this.groupName, consumeType, + messageModel, infoNew.toString()); + updated = true; + } + + infoOld = infoNew; + } else { + if (!infoOld.getClientId().equals(infoNew.getClientId())) { + log.error("[BUG] consumer channel exist in broker, but clientId not equal. GROUP: {} OLD: {} NEW: {} ", + this.groupName, + infoOld.toString(), + infoNew.toString()); + this.channelInfoTable.put(infoNew.getChannel(), infoNew); + } + } + + this.lastUpdateTimestamp = System.currentTimeMillis(); + infoOld.setLastUpdateTimestamp(this.lastUpdateTimestamp); + + return updated; + } + + + public boolean updateSubscription(final Set<SubscriptionData> subList) { + boolean updated = false; + + for (SubscriptionData sub : subList) { + SubscriptionData old = this.subscriptionTable.get(sub.getTopic()); + if (old == null) { + SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub); + if (null == prev) { + updated = true; + log.info("subscription changed, add new topic, group: {} {}", + this.groupName, + sub.toString()); + } + } else if (sub.getSubVersion() > old.getSubVersion()) { + if (this.consumeType == ConsumeType.CONSUME_PASSIVELY) { + log.info("subscription changed, group: {} OLD: {} NEW: {}", + this.groupName, + old.toString(), + sub.toString() + ); + } + + this.subscriptionTable.put(sub.getTopic(), sub); + } + } + + + Iterator<Entry<String, SubscriptionData>> it = this.subscriptionTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<String, SubscriptionData> next = it.next(); + String oldTopic = next.getKey(); + + boolean exist = false; + for (SubscriptionData sub : subList) { + if (sub.getTopic().equals(oldTopic)) { + exist = true; + break; + } + } + + if (!exist) { + log.warn("subscription changed, group: {} remove topic {} {}", + this.groupName, + oldTopic, + next.getValue().toString() + ); + + it.remove(); + updated = true; + } + } + + this.lastUpdateTimestamp = System.currentTimeMillis(); + + return updated; + } + + + public Set<String> getSubscribeTopics() { + return subscriptionTable.keySet(); + } + + + public SubscriptionData findSubscriptionData(final String topic) { + return this.subscriptionTable.get(topic); + } + + + public ConsumeType getConsumeType() { + return consumeType; + } + + + public void setConsumeType(ConsumeType consumeType) { + this.consumeType = consumeType; + } + + + public MessageModel getMessageModel() { + return messageModel; + } + + + public void setMessageModel(MessageModel messageModel) { + this.messageModel = messageModel; + } + + + public String getGroupName() { + return groupName; + } + + + public long getLastUpdateTimestamp() { + return lastUpdateTimestamp; + } + + + public void setLastUpdateTimestamp(long lastUpdateTimestamp) { + this.lastUpdateTimestamp = lastUpdateTimestamp; + } + + + public ConsumeFromWhere getConsumeFromWhere() { + return consumeFromWhere; + } + + + public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) { + this.consumeFromWhere = consumeFromWhere; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerIdsChangeListener.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerIdsChangeListener.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerIdsChangeListener.java new file mode 100644 index 0000000..e8d23db --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerIdsChangeListener.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.client; + +import io.netty.channel.Channel; + +import java.util.List; + + +/** + * @author shijia.wxr + */ +public interface ConsumerIdsChangeListener { + public void consumerIdsChanged(final String group, final List<Channel> channels); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java new file mode 100644 index 0000000..561fec6 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.client; + +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.apache.rocketmq.remoting.common.RemotingUtil; +import io.netty.channel.Channel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + + +/** + * @author shijia.wxr + */ +public class ConsumerManager { + private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120; + private final ConcurrentHashMap<String/* Group */, ConsumerGroupInfo> consumerTable = + new ConcurrentHashMap<String, ConsumerGroupInfo>(1024); + private final ConsumerIdsChangeListener consumerIdsChangeListener; + + public ConsumerManager(final ConsumerIdsChangeListener consumerIdsChangeListener) { + this.consumerIdsChangeListener = consumerIdsChangeListener; + } + + public ClientChannelInfo findChannel(final String group, final String clientId) { + ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group); + if (consumerGroupInfo != null) { + return consumerGroupInfo.findChannel(clientId); + } + return null; + } + + public SubscriptionData findSubscriptionData(final String group, final String topic) { + ConsumerGroupInfo consumerGroupInfo = this.getConsumerGroupInfo(group); + if (consumerGroupInfo != null) { + return consumerGroupInfo.findSubscriptionData(topic); + } + + return null; + } + + public ConsumerGroupInfo getConsumerGroupInfo(final String group) { + return this.consumerTable.get(group); + } + + public int findSubscriptionDataCount(final String group) { + ConsumerGroupInfo consumerGroupInfo = this.getConsumerGroupInfo(group); + if (consumerGroupInfo != null) { + return consumerGroupInfo.getSubscriptionTable().size(); + } + + return 0; + } + + public void doChannelCloseEvent(final String remoteAddr, final Channel channel) { + Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<String, ConsumerGroupInfo> next = it.next(); + ConsumerGroupInfo info = next.getValue(); + boolean removed = info.doChannelCloseEvent(remoteAddr, channel); + if (removed) { + if (info.getChannelInfoTable().isEmpty()) { + ConsumerGroupInfo remove = this.consumerTable.remove(next.getKey()); + if (remove != null) { + log.info("unregister consumer ok, no any connection, and remove consumer group, {}", + next.getKey()); + } + } + + this.consumerIdsChangeListener.consumerIdsChanged(next.getKey(), info.getAllChannel()); + } + } + } + + public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo, + ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere, + final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) { + + ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group); + if (null == consumerGroupInfo) { + ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere); + ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp); + consumerGroupInfo = prev != null ? prev : tmp; + } + + boolean r1 = + consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel, + consumeFromWhere); + boolean r2 = consumerGroupInfo.updateSubscription(subList); + + if (r1 || r2) { + if (isNotifyConsumerIdsChangedEnable) { + this.consumerIdsChangeListener.consumerIdsChanged(group, consumerGroupInfo.getAllChannel()); + } + } + + return r1 || r2; + } + + public void unregisterConsumer(final String group, final ClientChannelInfo clientChannelInfo, boolean isNotifyConsumerIdsChangedEnable) { + ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group); + if (null != consumerGroupInfo) { + consumerGroupInfo.unregisterChannel(clientChannelInfo); + if (consumerGroupInfo.getChannelInfoTable().isEmpty()) { + ConsumerGroupInfo remove = this.consumerTable.remove(group); + if (remove != null) { + log.info("unregister consumer ok, no any connection, and remove consumer group, {}", group); + } + } + if (isNotifyConsumerIdsChangedEnable) { + this.consumerIdsChangeListener.consumerIdsChanged(group, consumerGroupInfo.getAllChannel()); + } + } + } + + public void scanNotActiveChannel() { + Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<String, ConsumerGroupInfo> next = it.next(); + String group = next.getKey(); + ConsumerGroupInfo consumerGroupInfo = next.getValue(); + ConcurrentHashMap<Channel, ClientChannelInfo> channelInfoTable = + consumerGroupInfo.getChannelInfoTable(); + + Iterator<Entry<Channel, ClientChannelInfo>> itChannel = channelInfoTable.entrySet().iterator(); + while (itChannel.hasNext()) { + Entry<Channel, ClientChannelInfo> nextChannel = itChannel.next(); + ClientChannelInfo clientChannelInfo = nextChannel.getValue(); + long diff = System.currentTimeMillis() - clientChannelInfo.getLastUpdateTimestamp(); + if (diff > CHANNEL_EXPIRED_TIMEOUT) { + log.warn( + "SCAN: remove expired channel from ConsumerManager consumerTable. channel={}, consumerGroup={}", + RemotingHelper.parseChannelRemoteAddr(clientChannelInfo.getChannel()), group); + RemotingUtil.closeChannel(clientChannelInfo.getChannel()); + itChannel.remove(); + } + } + + if (channelInfoTable.isEmpty()) { + log.warn( + "SCAN: remove expired channel from ConsumerManager consumerTable, all clear, consumerGroup={}", + group); + it.remove(); + } + } + } + + public HashSet<String> queryTopicConsumeByWho(final String topic) { + HashSet<String> groups = new HashSet<String>(); + Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<String, ConsumerGroupInfo> entry = it.next(); + ConcurrentHashMap<String, SubscriptionData> subscriptionTable = + entry.getValue().getSubscriptionTable(); + if (subscriptionTable.containsKey(topic)) { + groups.add(entry.getKey()); + } + } + return groups; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java b/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java new file mode 100644 index 0000000..501d665 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.client; + +import org.apache.rocketmq.broker.BrokerController; +import io.netty.channel.Channel; + +import java.util.List; + + +/** + * @author shijia.wxr + */ +public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListener { + private final BrokerController brokerController; + + + public DefaultConsumerIdsChangeListener(BrokerController brokerController) { + this.brokerController = brokerController; + } + + + @Override + public void consumerIdsChanged(String group, List<Channel> channels) { + if (channels != null && brokerController.getBrokerConfig().isNotifyConsumerIdsChangedEnable()) { + for (Channel chl : channels) { + this.brokerController.getBroker2Client().notifyConsumerIdsChanged(chl, group); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java new file mode 100644 index 0000000..6656ab0 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.client; + +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.apache.rocketmq.remoting.common.RemotingUtil; +import io.netty.channel.Channel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + + +/** + * @author shijia.wxr + */ +public class ProducerManager { + private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + private static final long LOCK_TIMEOUT_MILLIS = 3000; + private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120; + private final Lock groupChannelLock = new ReentrantLock(); + private final HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> groupChannelTable = + new HashMap<String, HashMap<Channel, ClientChannelInfo>>(); + + + public ProducerManager() { + } + + + public HashMap<String, HashMap<Channel, ClientChannelInfo>> getGroupChannelTable() { + HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> newGroupChannelTable = + new HashMap<String, HashMap<Channel, ClientChannelInfo>>(); + try { + if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { + try { + newGroupChannelTable.putAll(groupChannelTable); + } finally { + groupChannelLock.unlock(); + } + } + } catch (InterruptedException e) { + log.error("", e); + } + return newGroupChannelTable; + } + + + public void scanNotActiveChannel() { + try { + if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { + try { + for (final Map.Entry<String, HashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable + .entrySet()) { + final String group = entry.getKey(); + final HashMap<Channel, ClientChannelInfo> chlMap = entry.getValue(); + + Iterator<Entry<Channel, ClientChannelInfo>> it = chlMap.entrySet().iterator(); + while (it.hasNext()) { + Entry<Channel, ClientChannelInfo> item = it.next(); + // final Integer id = item.getKey(); + final ClientChannelInfo info = item.getValue(); + + long diff = System.currentTimeMillis() - info.getLastUpdateTimestamp(); + if (diff > CHANNEL_EXPIRED_TIMEOUT) { + it.remove(); + log.warn( + "SCAN: remove expired channel[{}] from ProducerManager groupChannelTable, producer group name: {}", + RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group); + RemotingUtil.closeChannel(info.getChannel()); + } + } + } + } finally { + this.groupChannelLock.unlock(); + } + } else { + log.warn("ProducerManager scanNotActiveChannel lock timeout"); + } + } catch (InterruptedException e) { + log.error("", e); + } + } + + + public void doChannelCloseEvent(final String remoteAddr, final Channel channel) { + if (channel != null) { + try { + if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { + try { + for (final Map.Entry<String, HashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable + .entrySet()) { + final String group = entry.getKey(); + final HashMap<Channel, ClientChannelInfo> clientChannelInfoTable = + entry.getValue(); + final ClientChannelInfo clientChannelInfo = + clientChannelInfoTable.remove(channel); + if (clientChannelInfo != null) { + log.info( + "NETTY EVENT: remove channel[{}][{}] from ProducerManager groupChannelTable, producer group: {}", + clientChannelInfo.toString(), remoteAddr, group); + } + + } + } finally { + this.groupChannelLock.unlock(); + } + } else { + log.warn("ProducerManager doChannelCloseEvent lock timeout"); + } + } catch (InterruptedException e) { + log.error("", e); + } + } + } + + + public void registerProducer(final String group, final ClientChannelInfo clientChannelInfo) { + try { + ClientChannelInfo clientChannelInfoFound = null; + + if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { + try { + HashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group); + if (null == channelTable) { + channelTable = new HashMap<Channel, ClientChannelInfo>(); + this.groupChannelTable.put(group, channelTable); + } + + clientChannelInfoFound = channelTable.get(clientChannelInfo.getChannel()); + if (null == clientChannelInfoFound) { + channelTable.put(clientChannelInfo.getChannel(), clientChannelInfo); + log.info("new producer connected, group: {} channel: {}", group, + clientChannelInfo.toString()); + } + } finally { + this.groupChannelLock.unlock(); + } + + if (clientChannelInfoFound != null) { + clientChannelInfoFound.setLastUpdateTimestamp(System.currentTimeMillis()); + } + } else { + log.warn("ProducerManager registerProducer lock timeout"); + } + } catch (InterruptedException e) { + log.error("", e); + } + } + + + public void unregisterProducer(final String group, final ClientChannelInfo clientChannelInfo) { + try { + if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { + try { + HashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group); + if (null != channelTable && !channelTable.isEmpty()) { + ClientChannelInfo old = channelTable.remove(clientChannelInfo.getChannel()); + if (old != null) { + log.info("unregister a producer[{}] from groupChannelTable {}", group, + clientChannelInfo.toString()); + } + + if (channelTable.isEmpty()) { + this.groupChannelTable.remove(group); + log.info("unregister a producer group[{}] from groupChannelTable", group); + } + } + } finally { + this.groupChannelLock.unlock(); + } + } else { + log.warn("ProducerManager unregisterProducer lock timeout"); + } + } catch (InterruptedException e) { + log.error("", e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java b/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java new file mode 100644 index 0000000..7d7064a --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java @@ -0,0 +1,317 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.client.net; + +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.client.ClientChannelInfo; +import org.apache.rocketmq.broker.client.ConsumerGroupInfo; +import org.apache.rocketmq.broker.pagecache.OneMessageTransfer; +import org.apache.rocketmq.common.MQVersion; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.message.MessageQueueForC; +import org.apache.rocketmq.common.protocol.RequestCode; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.body.GetConsumerStatusBody; +import org.apache.rocketmq.common.protocol.body.ResetOffsetBody; +import org.apache.rocketmq.common.protocol.body.ResetOffsetBodyForC; +import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetConsumerStatusRequestHeader; +import org.apache.rocketmq.common.protocol.header.NotifyConsumerIdsChangedRequestHeader; +import org.apache.rocketmq.common.protocol.header.ResetOffsetRequestHeader; +import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.store.SelectMappedBufferResult; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.FileRegion; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; + + +/** + * @author shijia.wxr + */ +public class Broker2Client { + private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + private final BrokerController brokerController; + + public Broker2Client(BrokerController brokerController) { + this.brokerController = brokerController; + } + + public void checkProducerTransactionState( + final Channel channel, + final CheckTransactionStateRequestHeader requestHeader, + final SelectMappedBufferResult selectMappedBufferResult) { + RemotingCommand request = + RemotingCommand.createRequestCommand(RequestCode.CHECK_TRANSACTION_STATE, requestHeader); + request.markOnewayRPC(); + + try { + FileRegion fileRegion = + new OneMessageTransfer(request.encodeHeader(selectMappedBufferResult.getSize()), + selectMappedBufferResult); + channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + selectMappedBufferResult.release(); + if (!future.isSuccess()) { + log.error("invokeProducer failed,", future.cause()); + } + } + }); + } catch (Throwable e) { + log.error("invokeProducer exception", e); + selectMappedBufferResult.release(); + } + } + + public RemotingCommand callClient(final Channel channel, + final RemotingCommand request + ) throws RemotingSendRequestException, RemotingTimeoutException, InterruptedException { + return this.brokerController.getRemotingServer().invokeSync(channel, request, 10000); + } + + public void notifyConsumerIdsChanged( + final Channel channel, + final String consumerGroup) { + if (null == consumerGroup) { + log.error("notifyConsumerIdsChanged consumerGroup is null"); + return; + } + + NotifyConsumerIdsChangedRequestHeader requestHeader = new NotifyConsumerIdsChangedRequestHeader(); + requestHeader.setConsumerGroup(consumerGroup); + RemotingCommand request = + RemotingCommand.createRequestCommand(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader); + + try { + this.brokerController.getRemotingServer().invokeOneway(channel, request, 10); + } catch (Exception e) { + log.error("notifyConsumerIdsChanged exception, " + consumerGroup, e.getMessage()); + } + } + + public RemotingCommand resetOffset(String topic, String group, long timeStamp, boolean isForce) { + return resetOffset(topic, group, timeStamp, isForce, false); + } + + public RemotingCommand resetOffset(String topic, String group, long timeStamp, boolean isForce, + boolean isC) { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + + TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic); + if (null == topicConfig) { + log.error("[reset-offset] reset offset failed, no topic in this broker. topic={}", topic); + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("[reset-offset] reset offset failed, no topic in this broker. topic=" + topic); + return response; + } + + Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>(); + + for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) { + MessageQueue mq = new MessageQueue(); + mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName()); + mq.setTopic(topic); + mq.setQueueId(i); + + long consumerOffset = + this.brokerController.getConsumerOffsetManager().queryOffset(group, topic, i); + if (-1 == consumerOffset) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark(String.format("THe consumer group <%s> not exist", group)); + return response; + } + + long timeStampOffset; + if (timeStamp == -1) { + + timeStampOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, i); + } else { + timeStampOffset = this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, i, timeStamp); + } + + if (timeStampOffset < 0) { + log.warn("reset offset is invalid. topic={}, queueId={}, timeStampOffset={}", topic, i, timeStampOffset); + timeStampOffset = 0; + } + + if (isForce || timeStampOffset < consumerOffset) { + offsetTable.put(mq, timeStampOffset); + } else { + offsetTable.put(mq, consumerOffset); + } + } + + ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader(); + requestHeader.setTopic(topic); + requestHeader.setGroup(group); + requestHeader.setTimestamp(timeStamp); + RemotingCommand request = + RemotingCommand.createRequestCommand(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, requestHeader); + if (isC) { + // c++ language + ResetOffsetBodyForC body = new ResetOffsetBodyForC(); + List<MessageQueueForC> offsetList = convertOffsetTable2OffsetList(offsetTable); + body.setOffsetTable(offsetList); + request.setBody(body.encode()); + } else { + // other language + ResetOffsetBody body = new ResetOffsetBody(); + body.setOffsetTable(offsetTable); + request.setBody(body.encode()); + } + + ConsumerGroupInfo consumerGroupInfo = + this.brokerController.getConsumerManager().getConsumerGroupInfo(group); + + if (consumerGroupInfo != null && !consumerGroupInfo.getAllChannel().isEmpty()) { + ConcurrentHashMap<Channel, ClientChannelInfo> channelInfoTable = + consumerGroupInfo.getChannelInfoTable(); + for (Map.Entry<Channel, ClientChannelInfo> entry : channelInfoTable.entrySet()) { + int version = entry.getValue().getVersion(); + if (version >= MQVersion.Version.V3_0_7_SNAPSHOT.ordinal()) { + try { + this.brokerController.getRemotingServer().invokeOneway(entry.getKey(), request, 5000); + log.info("[reset-offset] reset offset success. topic={}, group={}, clientId={}", + new Object[]{topic, group, entry.getValue().getClientId()}); + } catch (Exception e) { + log.error("[reset-offset] reset offset exception. topic={}, group={}", + new Object[]{topic, group}, e); + } + } else { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("the client does not support this feature. version=" + + MQVersion.getVersionDesc(version)); + log.warn("[reset-offset] the client does not support this feature. version={}", + RemotingHelper.parseChannelRemoteAddr(entry.getKey()), MQVersion.getVersionDesc(version)); + return response; + } + } + } else { + String errorInfo = + String.format("Consumer not online, so can not reset offset, Group: %s Topic: %s Timestamp: %d", + requestHeader.getGroup(), + requestHeader.getTopic(), + requestHeader.getTimestamp()); + log.error(errorInfo); + response.setCode(ResponseCode.CONSUMER_NOT_ONLINE); + response.setRemark(errorInfo); + return response; + } + response.setCode(ResponseCode.SUCCESS); + ResetOffsetBody resBody = new ResetOffsetBody(); + resBody.setOffsetTable(offsetTable); + response.setBody(resBody.encode()); + return response; + } + + private List<MessageQueueForC> convertOffsetTable2OffsetList(Map<MessageQueue, Long> table) { + List<MessageQueueForC> list = new ArrayList<MessageQueueForC>(); + for (Entry<MessageQueue, Long> entry : table.entrySet()) { + MessageQueue mq = entry.getKey(); + MessageQueueForC tmp = + new MessageQueueForC(mq.getTopic(), mq.getBrokerName(), mq.getQueueId(), entry.getValue()); + list.add(tmp); + } + return list; + } + + public RemotingCommand getConsumeStatus(String topic, String group, String originClientId) { + final RemotingCommand result = RemotingCommand.createResponseCommand(null); + + GetConsumerStatusRequestHeader requestHeader = new GetConsumerStatusRequestHeader(); + requestHeader.setTopic(topic); + requestHeader.setGroup(group); + RemotingCommand request = + RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT, + requestHeader); + + Map<String, Map<MessageQueue, Long>> consumerStatusTable = + new HashMap<String, Map<MessageQueue, Long>>(); + ConcurrentHashMap<Channel, ClientChannelInfo> channelInfoTable = + this.brokerController.getConsumerManager().getConsumerGroupInfo(group).getChannelInfoTable(); + if (null == channelInfoTable || channelInfoTable.isEmpty()) { + result.setCode(ResponseCode.SYSTEM_ERROR); + result.setRemark(String.format("No Any Consumer online in the consumer group: [%s]", group)); + return result; + } + + for (Map.Entry<Channel, ClientChannelInfo> entry : channelInfoTable.entrySet()) { + int version = entry.getValue().getVersion(); + String clientId = entry.getValue().getClientId(); + if (version < MQVersion.Version.V3_0_7_SNAPSHOT.ordinal()) { + result.setCode(ResponseCode.SYSTEM_ERROR); + result.setRemark("the client does not support this feature. version=" + + MQVersion.getVersionDesc(version)); + log.warn("[get-consumer-status] the client does not support this feature. version={}", + RemotingHelper.parseChannelRemoteAddr(entry.getKey()), MQVersion.getVersionDesc(version)); + return result; + } else if (UtilAll.isBlank(originClientId) || originClientId.equals(clientId)) { + try { + RemotingCommand response = + this.brokerController.getRemotingServer().invokeSync(entry.getKey(), request, 5000); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + if (response.getBody() != null) { + GetConsumerStatusBody body = + GetConsumerStatusBody.decode(response.getBody(), + GetConsumerStatusBody.class); + + consumerStatusTable.put(clientId, body.getMessageQueueTable()); + log.info( + "[get-consumer-status] get consumer status success. topic={}, group={}, channelRemoteAddr={}", + new Object[]{topic, group, clientId}); + } + } + default: + break; + } + } catch (Exception e) { + log.error( + "[get-consumer-status] get consumer status exception. topic={}, group={}, offset={}", + new Object[]{topic, group}, e); + } + + if (!UtilAll.isBlank(originClientId) && originClientId.equals(clientId)) { + break; + } + } + } + + result.setCode(ResponseCode.SUCCESS); + GetConsumerStatusBody resBody = new GetConsumerStatusBody(); + resBody.setConsumerTable(consumerStatusTable); + result.setBody(resBody.encode()); + return result; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java new file mode 100644 index 0000000..adb1819 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java @@ -0,0 +1,281 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.client.rebalance; + +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.message.MessageQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + + +/** + * @author shijia.wxr + */ +public class RebalanceLockManager { + private static final Logger log = LoggerFactory.getLogger(LoggerName.REBALANCE_LOCK_LOGGER_NAME); + private final static long REBALANCE_LOCK_MAX_LIVE_TIME = Long.parseLong(System.getProperty( + "rocketmq.broker.rebalance.lockMaxLiveTime", "60000")); + private final Lock lock = new ReentrantLock(); + private final ConcurrentHashMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable = + new ConcurrentHashMap<String, ConcurrentHashMap<MessageQueue, LockEntry>>(1024); + + public boolean tryLock(final String group, final MessageQueue mq, final String clientId) { + + if (!this.isLocked(group, mq, clientId)) { + try { + this.lock.lockInterruptibly(); + try { + ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group); + if (null == groupValue) { + groupValue = new ConcurrentHashMap<MessageQueue, LockEntry>(32); + this.mqLockTable.put(group, groupValue); + } + + LockEntry lockEntry = groupValue.get(mq); + if (null == lockEntry) { + lockEntry = new LockEntry(); + lockEntry.setClientId(clientId); + groupValue.put(mq, lockEntry); + log.info("tryLock, message queue not locked, I got it. Group: {} NewClientId: {} {}", // + group, // + clientId, // + mq); + } + + if (lockEntry.isLocked(clientId)) { + lockEntry.setLastUpdateTimestamp(System.currentTimeMillis()); + return true; + } + + String oldClientId = lockEntry.getClientId(); + + + if (lockEntry.isExpired()) { + lockEntry.setClientId(clientId); + lockEntry.setLastUpdateTimestamp(System.currentTimeMillis()); + log.warn( + "tryLock, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}", // + group, // + oldClientId, // + clientId, // + mq); + return true; + } + + + log.warn( + "tryLock, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}", // + group, // + oldClientId, // + clientId, // + mq); + return false; + } finally { + this.lock.unlock(); + } + } catch (InterruptedException e) { + log.error("putMessage exception", e); + } + } else { + + } + + return true; + } + + private boolean isLocked(final String group, final MessageQueue mq, final String clientId) { + ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group); + if (groupValue != null) { + LockEntry lockEntry = groupValue.get(mq); + if (lockEntry != null) { + boolean locked = lockEntry.isLocked(clientId); + if (locked) { + lockEntry.setLastUpdateTimestamp(System.currentTimeMillis()); + } + + return locked; + } + } + + return false; + } + + public Set<MessageQueue> tryLockBatch(final String group, final Set<MessageQueue> mqs, + final String clientId) { + Set<MessageQueue> lockedMqs = new HashSet<MessageQueue>(mqs.size()); + Set<MessageQueue> notLockedMqs = new HashSet<MessageQueue>(mqs.size()); + + + for (MessageQueue mq : mqs) { + if (this.isLocked(group, mq, clientId)) { + lockedMqs.add(mq); + } else { + notLockedMqs.add(mq); + } + } + + if (!notLockedMqs.isEmpty()) { + try { + this.lock.lockInterruptibly(); + try { + ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group); + if (null == groupValue) { + groupValue = new ConcurrentHashMap<MessageQueue, LockEntry>(32); + this.mqLockTable.put(group, groupValue); + } + + + for (MessageQueue mq : notLockedMqs) { + LockEntry lockEntry = groupValue.get(mq); + if (null == lockEntry) { + lockEntry = new LockEntry(); + lockEntry.setClientId(clientId); + groupValue.put(mq, lockEntry); + log.info( + "tryLockBatch, message queue not locked, I got it. Group: {} NewClientId: {} {}", // + group, // + clientId, // + mq); + } + + + if (lockEntry.isLocked(clientId)) { + lockEntry.setLastUpdateTimestamp(System.currentTimeMillis()); + lockedMqs.add(mq); + continue; + } + + String oldClientId = lockEntry.getClientId(); + + + if (lockEntry.isExpired()) { + lockEntry.setClientId(clientId); + lockEntry.setLastUpdateTimestamp(System.currentTimeMillis()); + log.warn( + "tryLockBatch, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}", // + group, // + oldClientId, // + clientId, // + mq); + lockedMqs.add(mq); + continue; + } + + + log.warn( + "tryLockBatch, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}", // + group, // + oldClientId, // + clientId, // + mq); + } + } finally { + this.lock.unlock(); + } + } catch (InterruptedException e) { + log.error("putMessage exception", e); + } + } + + return lockedMqs; + } + + public void unlockBatch(final String group, final Set<MessageQueue> mqs, final String clientId) { + try { + this.lock.lockInterruptibly(); + try { + ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group); + if (null != groupValue) { + for (MessageQueue mq : mqs) { + LockEntry lockEntry = groupValue.get(mq); + if (null != lockEntry) { + if (lockEntry.getClientId().equals(clientId)) { + groupValue.remove(mq); + log.info("unlockBatch, Group: {} {} {}", + group, + mq, + clientId); + } else { + log.warn("unlockBatch, but mq locked by other client: {}, Group: {} {} {}", + lockEntry.getClientId(), + group, + mq, + clientId); + } + } else { + log.warn("unlockBatch, but mq not locked, Group: {} {} {}", + group, + mq, + clientId); + } + } + } else { + log.warn("unlockBatch, group not exist, Group: {} {}", + group, + clientId); + } + } finally { + this.lock.unlock(); + } + } catch (InterruptedException e) { + log.error("putMessage exception", e); + } + } + + static class LockEntry { + private String clientId; + private volatile long lastUpdateTimestamp = System.currentTimeMillis(); + + + public String getClientId() { + return clientId; + } + + + public void setClientId(String clientId) { + this.clientId = clientId; + } + + + public long getLastUpdateTimestamp() { + return lastUpdateTimestamp; + } + + + public void setLastUpdateTimestamp(long lastUpdateTimestamp) { + this.lastUpdateTimestamp = lastUpdateTimestamp; + } + + public boolean isLocked(final String clientId) { + boolean eq = this.clientId.equals(clientId); + return eq && !this.isExpired(); + } + + public boolean isExpired() { + boolean expired = + (System.currentTimeMillis() - this.lastUpdateTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME; + + return expired; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java new file mode 100644 index 0000000..5b86d99 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.broker.filtersrv; + +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.BrokerStartup; +import org.apache.rocketmq.common.ThreadFactoryImpl; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.remoting.common.RemotingUtil; +import io.netty.channel.Channel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + + +public class FilterServerManager { + + public static final long FILTER_SERVER_MAX_IDLE_TIME_MILLS = 30000; + private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + private final ConcurrentHashMap<Channel, FilterServerInfo> filterServerTable = + new ConcurrentHashMap<Channel, FilterServerInfo>(16); + private final BrokerController brokerController; + + private ScheduledExecutorService scheduledExecutorService = Executors + .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FilterServerManagerScheduledThread")); + + public FilterServerManager(final BrokerController brokerController) { + this.brokerController = brokerController; + } + + public void start() { + + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + FilterServerManager.this.createFilterServer(); + } catch (Exception e) { + log.error("", e); + } + } + }, 1000 * 5, 1000 * 30, TimeUnit.MILLISECONDS); + } + + public void createFilterServer() { + int more = + this.brokerController.getBrokerConfig().getFilterServerNums() - this.filterServerTable.size(); + String cmd = this.buildStartCommand(); + for (int i = 0; i < more; i++) { + FilterServerUtil.callShell(cmd, log); + } + } + + private String buildStartCommand() { + String config = ""; + if (BrokerStartup.configFile != null) { + config = String.format("-c %s", BrokerStartup.configFile); + } + + if (this.brokerController.getBrokerConfig().getNamesrvAddr() != null) { + config += String.format(" -n %s", this.brokerController.getBrokerConfig().getNamesrvAddr()); + } + + if (RemotingUtil.isWindowsPlatform()) { + return String.format("start /b %s\\bin\\mqfiltersrv.exe %s", + this.brokerController.getBrokerConfig().getRocketmqHome(), + config); + } else { + return String.format("sh %s/bin/startfsrv.sh %s", + this.brokerController.getBrokerConfig().getRocketmqHome(), + config); + } + } + + public void shutdown() { + this.scheduledExecutorService.shutdown(); + } + + public void registerFilterServer(final Channel channel, final String filterServerAddr) { + FilterServerInfo filterServerInfo = this.filterServerTable.get(channel); + if (filterServerInfo != null) { + filterServerInfo.setLastUpdateTimestamp(System.currentTimeMillis()); + } else { + filterServerInfo = new FilterServerInfo(); + filterServerInfo.setFilterServerAddr(filterServerAddr); + filterServerInfo.setLastUpdateTimestamp(System.currentTimeMillis()); + this.filterServerTable.put(channel, filterServerInfo); + log.info("Receive a New Filter Server<{}>", filterServerAddr); + } + } + + /** + + */ + public void scanNotActiveChannel() { + + Iterator<Entry<Channel, FilterServerInfo>> it = this.filterServerTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<Channel, FilterServerInfo> next = it.next(); + long timestamp = next.getValue().getLastUpdateTimestamp(); + Channel channel = next.getKey(); + if ((System.currentTimeMillis() - timestamp) > FILTER_SERVER_MAX_IDLE_TIME_MILLS) { + log.info("The Filter Server<{}> expired, remove it", next.getKey()); + it.remove(); + RemotingUtil.closeChannel(channel); + } + } + } + + public void doChannelCloseEvent(final String remoteAddr, final Channel channel) { + FilterServerInfo old = this.filterServerTable.remove(channel); + if (old != null) { + log.warn("The Filter Server<{}> connection<{}> closed, remove it", old.getFilterServerAddr(), + remoteAddr); + } + } + + public List<String> buildNewFilterServerList() { + List<String> addr = new ArrayList<String>(); + Iterator<Entry<Channel, FilterServerInfo>> it = this.filterServerTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<Channel, FilterServerInfo> next = it.next(); + addr.add(next.getValue().getFilterServerAddr()); + } + return addr; + } + + static class FilterServerInfo { + private String filterServerAddr; + private long lastUpdateTimestamp; + + + public String getFilterServerAddr() { + return filterServerAddr; + } + + + public void setFilterServerAddr(String filterServerAddr) { + this.filterServerAddr = filterServerAddr; + } + + + public long getLastUpdateTimestamp() { + return lastUpdateTimestamp; + } + + + public void setLastUpdateTimestamp(long lastUpdateTimestamp) { + this.lastUpdateTimestamp = lastUpdateTimestamp; + } + } +}
