http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/processor/DefaultRequestProcessor.java ---------------------------------------------------------------------- diff --git a/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/processor/DefaultRequestProcessor.java b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/processor/DefaultRequestProcessor.java new file mode 100644 index 0000000..105cfff --- /dev/null +++ b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/processor/DefaultRequestProcessor.java @@ -0,0 +1,355 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.filtersrv.processor; + +import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer; +import com.alibaba.rocketmq.client.consumer.PullCallback; +import com.alibaba.rocketmq.client.consumer.PullResult; +import com.alibaba.rocketmq.common.MixAll; +import com.alibaba.rocketmq.common.UtilAll; +import com.alibaba.rocketmq.common.constant.LoggerName; +import com.alibaba.rocketmq.common.filter.FilterContext; +import com.alibaba.rocketmq.common.message.MessageDecoder; +import com.alibaba.rocketmq.common.message.MessageExt; +import com.alibaba.rocketmq.common.message.MessageQueue; +import com.alibaba.rocketmq.common.protocol.RequestCode; +import com.alibaba.rocketmq.common.protocol.ResponseCode; +import com.alibaba.rocketmq.common.protocol.header.PullMessageRequestHeader; +import com.alibaba.rocketmq.common.protocol.header.PullMessageResponseHeader; +import com.alibaba.rocketmq.common.protocol.header.filtersrv.RegisterMessageFilterClassRequestHeader; +import com.alibaba.rocketmq.common.sysflag.MessageSysFlag; +import com.alibaba.rocketmq.filtersrv.FiltersrvController; +import com.alibaba.rocketmq.filtersrv.filter.FilterClassInfo; +import com.alibaba.rocketmq.remoting.common.RemotingHelper; +import com.alibaba.rocketmq.remoting.exception.RemotingCommandException; +import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor; +import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; +import com.alibaba.rocketmq.store.CommitLog; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + + +/** + * @author shijia.wxr + */ +public class DefaultRequestProcessor implements NettyRequestProcessor { + private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME); + + private final FiltersrvController filtersrvController; + + + public DefaultRequestProcessor(FiltersrvController filtersrvController) { + this.filtersrvController = filtersrvController; + } + + + @Override + public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception { + if (log.isDebugEnabled()) { + log.debug("receive request, {} {} {}", + request.getCode(), + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), + request); + } + + switch (request.getCode()) { + case RequestCode.REGISTER_MESSAGE_FILTER_CLASS: + return registerMessageFilterClass(ctx, request); + case RequestCode.PULL_MESSAGE: + return pullMessageForward(ctx, request); + } + + return null; + } + + @Override + public boolean rejectRequest() { + return false; + } + + private RemotingCommand registerMessageFilterClass(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + final RegisterMessageFilterClassRequestHeader requestHeader = + (RegisterMessageFilterClassRequestHeader) request.decodeCommandCustomHeader(RegisterMessageFilterClassRequestHeader.class); + + try { + boolean ok = this.filtersrvController.getFilterClassManager().registerFilterClass(requestHeader.getConsumerGroup(), + requestHeader.getTopic(), + requestHeader.getClassName(), + requestHeader.getClassCRC(), + request.getBody()); + if (!ok) { + throw new Exception("registerFilterClass error"); + } + } catch (Exception e) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark(RemotingHelper.exceptionSimpleDesc(e)); + return response; + } + + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + + private RemotingCommand pullMessageForward(final ChannelHandlerContext ctx, final RemotingCommand request) throws Exception { + final RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class); + final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader(); + final PullMessageRequestHeader requestHeader = + (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class); + + final FilterContext filterContext = new FilterContext(); + filterContext.setConsumerGroup(requestHeader.getConsumerGroup()); + + + response.setOpaque(request.getOpaque()); + + DefaultMQPullConsumer pullConsumer = this.filtersrvController.getDefaultMQPullConsumer(); + final FilterClassInfo findFilterClass = + this.filtersrvController.getFilterClassManager() + .findFilterClass(requestHeader.getConsumerGroup(), requestHeader.getTopic()); + if (null == findFilterClass) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("Find Filter class failed, not registered"); + return response; + } + + if (null == findFilterClass.getMessageFilter()) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("Find Filter class failed, registered but no class"); + return response; + } + + responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID); + + + MessageQueue mq = new MessageQueue(); + mq.setTopic(requestHeader.getTopic()); + mq.setQueueId(requestHeader.getQueueId()); + mq.setBrokerName(this.filtersrvController.getBrokerName()); + long offset = requestHeader.getQueueOffset(); + int maxNums = requestHeader.getMaxMsgNums(); + + final PullCallback pullCallback = new PullCallback() { + + @Override + public void onSuccess(PullResult pullResult) { + responseHeader.setMaxOffset(pullResult.getMaxOffset()); + responseHeader.setMinOffset(pullResult.getMinOffset()); + responseHeader.setNextBeginOffset(pullResult.getNextBeginOffset()); + response.setRemark(null); + + switch (pullResult.getPullStatus()) { + case FOUND: + response.setCode(ResponseCode.SUCCESS); + + List<MessageExt> msgListOK = new ArrayList<MessageExt>(); + try { + for (MessageExt msg : pullResult.getMsgFoundList()) { + boolean match = findFilterClass.getMessageFilter().match(msg, filterContext); + if (match) { + msgListOK.add(msg); + } + } + + + if (!msgListOK.isEmpty()) { + returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, msgListOK); + return; + } else { + response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); + } + } catch (Throwable e) { + final String error = + String.format("do Message Filter Exception, ConsumerGroup: %s Topic: %s ", + requestHeader.getConsumerGroup(), requestHeader.getTopic()); + log.error(error, e); + + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark(error + RemotingHelper.exceptionSimpleDesc(e)); + returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null); + return; + } + + break; + case NO_MATCHED_MSG: + response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); + break; + case NO_NEW_MSG: + response.setCode(ResponseCode.PULL_NOT_FOUND); + break; + case OFFSET_ILLEGAL: + response.setCode(ResponseCode.PULL_OFFSET_MOVED); + break; + default: + break; + } + + returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null); + } + + + @Override + public void onException(Throwable e) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("Pull Callback Exception, " + RemotingHelper.exceptionSimpleDesc(e)); + returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null); + return; + } + }; + + pullConsumer.pullBlockIfNotFound(mq, null, offset, maxNums, pullCallback); + + return null; + } + + private void returnResponse(final String group, final String topic, ChannelHandlerContext ctx, final RemotingCommand response, + final List<MessageExt> msgList) { + if (null != msgList) { + ByteBuffer[] msgBufferList = new ByteBuffer[msgList.size()]; + int bodyTotalSize = 0; + for (int i = 0; i < msgList.size(); i++) { + try { + msgBufferList[i] = messageToByteBuffer(msgList.get(i)); + bodyTotalSize += msgBufferList[i].capacity(); + } catch (Exception e) { + log.error("messageToByteBuffer UnsupportedEncodingException", e); + } + } + + ByteBuffer body = ByteBuffer.allocate(bodyTotalSize); + for (ByteBuffer bb : msgBufferList) { + bb.flip(); + body.put(bb); + } + + response.setBody(body.array()); + + + this.filtersrvController.getFilterServerStatsManager().incGroupGetNums(group, topic, msgList.size()); + + this.filtersrvController.getFilterServerStatsManager().incGroupGetSize(group, topic, bodyTotalSize); + } + + try { + ctx.writeAndFlush(response).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + log.error("FilterServer response to " + future.channel().remoteAddress() + " failed", future.cause()); + log.error(response.toString()); + } + } + }); + } catch (Throwable e) { + log.error("FilterServer process request over, but response failed", e); + log.error(response.toString()); + } + } + + private ByteBuffer messageToByteBuffer(final MessageExt msg) throws IOException { + int sysFlag = MessageSysFlag.clearCompressedFlag(msg.getSysFlag()); + if (msg.getBody() != null) { + if (msg.getBody().length >= this.filtersrvController.getFiltersrvConfig().getCompressMsgBodyOverHowmuch()) { + byte[] data = UtilAll.compress(msg.getBody(), this.filtersrvController.getFiltersrvConfig().getZipCompressLevel()); + if (data != null) { + msg.setBody(data); + sysFlag |= MessageSysFlag.COMPRESSED_FLAG; + } + } + } + + final int bodyLength = msg.getBody() != null ? msg.getBody().length : 0; + byte[] topicData = msg.getTopic().getBytes(MixAll.DEFAULT_CHARSET); + final int topicLength = topicData.length; + String properties = MessageDecoder.messageProperties2String(msg.getProperties()); + byte[] propertiesData = properties.getBytes(MixAll.DEFAULT_CHARSET); + final int propertiesLength = propertiesData.length; + final int msgLen = 4 // 1 TOTALSIZE + + 4 // 2 MAGICCODE + + 4 // 3 BODYCRC + + 4 // 4 QUEUEID + + 4 // 5 FLAG + + 8 // 6 QUEUEOFFSET + + 8 // 7 PHYSICALOFFSET + + 4 // 8 SYSFLAG + + 8 // 9 BORNTIMESTAMP + + 8 // 10 BORNHOST + + 8 // 11 STORETIMESTAMP + + 8 // 12 STOREHOSTADDRESS + + 4 // 13 RECONSUMETIMES + + 8 // 14 Prepared Transaction Offset + + 4 + bodyLength // 14 BODY + + 1 + topicLength // 15 TOPIC + + 2 + propertiesLength // 16 propertiesLength + + 0; + + ByteBuffer msgStoreItemMemory = ByteBuffer.allocate(msgLen); + + final MessageExt msgInner = msg; + + // 1 TOTALSIZE + msgStoreItemMemory.putInt(msgLen); + // 2 MAGICCODE + msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE); + // 3 BODYCRC + msgStoreItemMemory.putInt(UtilAll.crc32(msgInner.getBody())); + // 4 QUEUEID + msgStoreItemMemory.putInt(msgInner.getQueueId()); + // 5 FLAG + msgStoreItemMemory.putInt(msgInner.getFlag()); + // 6 QUEUEOFFSET + msgStoreItemMemory.putLong(msgInner.getQueueOffset()); + // 7 PHYSICALOFFSET + msgStoreItemMemory.putLong(msgInner.getCommitLogOffset()); + // 8 SYSFLAG + msgStoreItemMemory.putInt(sysFlag); + // 9 BORNTIMESTAMP + msgStoreItemMemory.putLong(msgInner.getBornTimestamp()); + // 10 BORNHOST + msgStoreItemMemory.put(msgInner.getBornHostBytes()); + // 11 STORETIMESTAMP + msgStoreItemMemory.putLong(msgInner.getStoreTimestamp()); + // 12 STOREHOSTADDRESS + msgStoreItemMemory.put(msgInner.getStoreHostBytes()); + // 13 RECONSUMETIMES + msgStoreItemMemory.putInt(msgInner.getReconsumeTimes()); + // 14 Prepared Transaction Offset + msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset()); + // 15 BODY + msgStoreItemMemory.putInt(bodyLength); + if (bodyLength > 0) + msgStoreItemMemory.put(msgInner.getBody()); + // 16 TOPIC + msgStoreItemMemory.put((byte) topicLength); + msgStoreItemMemory.put(topicData); + // 17 PROPERTIES + msgStoreItemMemory.putShort((short) propertiesLength); + if (propertiesLength > 0) + msgStoreItemMemory.put(propertiesData); + + return msgStoreItemMemory; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/stats/FilterServerStatsManager.java ---------------------------------------------------------------------- diff --git a/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/stats/FilterServerStatsManager.java b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/stats/FilterServerStatsManager.java new file mode 100644 index 0000000..3921c92 --- /dev/null +++ b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/stats/FilterServerStatsManager.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.filtersrv.stats; + +import com.alibaba.rocketmq.common.ThreadFactoryImpl; +import com.alibaba.rocketmq.common.constant.LoggerName; +import com.alibaba.rocketmq.common.stats.StatsItemSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + + +public class FilterServerStatsManager { + private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME); + private final ScheduledExecutorService scheduledExecutorService = Executors + .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FSStatsThread")); + + // ConsumerGroup Get Nums + private final StatsItemSet groupGetNums = new StatsItemSet("GROUP_GET_NUMS", + this.scheduledExecutorService, log); + + // ConsumerGroup Get Size + private final StatsItemSet groupGetSize = new StatsItemSet("GROUP_GET_SIZE", + this.scheduledExecutorService, log); + + + public FilterServerStatsManager() { + } + + + public void start() { + } + + + public void shutdown() { + this.scheduledExecutorService.shutdown(); + } + + + public void incGroupGetNums(final String group, final String topic, final int incValue) { + this.groupGetNums.addValue(topic + "@" + group, incValue, 1); + } + + + public void incGroupGetSize(final String group, final String topic, final int incValue) { + this.groupGetSize.addValue(topic + "@" + group, incValue, 1); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-namesrv/pom.xml ---------------------------------------------------------------------- diff --git a/rocketmq-namesrv/pom.xml b/rocketmq-namesrv/pom.xml new file mode 100644 index 0000000..3494f8f --- /dev/null +++ b/rocketmq-namesrv/pom.xml @@ -0,0 +1,58 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain producerGroup copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <groupId>com.alibaba.rocketmq</groupId> + <artifactId>rocketmq-all</artifactId> + <version>4.0.0-SNAPSHOT</version> + </parent> + + <modelVersion>4.0.0</modelVersion> + <packaging>jar</packaging> + <artifactId>rocketmq-namesrv</artifactId> + <name>rocketmq-namesrv ${project.version}</name> + + <dependencies> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.alibaba.rocketmq</groupId> + <artifactId>rocketmq-client</artifactId> + </dependency> + <dependency> + <groupId>com.alibaba.rocketmq</groupId> + <artifactId>rocketmq-tools</artifactId> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>rocketmq-srvutil</artifactId> + </dependency> + <dependency> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + </dependency> + <dependency> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-core</artifactId> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/NamesrvController.java ---------------------------------------------------------------------- diff --git a/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/NamesrvController.java b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/NamesrvController.java new file mode 100644 index 0000000..82f2622 --- /dev/null +++ b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/NamesrvController.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.namesrv; + +import com.alibaba.rocketmq.common.Configuration; +import com.alibaba.rocketmq.common.ThreadFactoryImpl; +import com.alibaba.rocketmq.common.constant.LoggerName; +import com.alibaba.rocketmq.common.namesrv.NamesrvConfig; +import com.alibaba.rocketmq.namesrv.kvconfig.KVConfigManager; +import com.alibaba.rocketmq.namesrv.processor.ClusterTestRequestProcessor; +import com.alibaba.rocketmq.namesrv.processor.DefaultRequestProcessor; +import com.alibaba.rocketmq.namesrv.routeinfo.BrokerHousekeepingService; +import com.alibaba.rocketmq.namesrv.routeinfo.RouteInfoManager; +import com.alibaba.rocketmq.remoting.RemotingServer; +import com.alibaba.rocketmq.remoting.netty.NettyRemotingServer; +import com.alibaba.rocketmq.remoting.netty.NettyServerConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + + +/** + * @author shijia.wxr + */ +public class NamesrvController { + private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); + + private final NamesrvConfig namesrvConfig; + + private final NettyServerConfig nettyServerConfig; + + private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl( + "NSScheduledThread")); + private final KVConfigManager kvConfigManager; + private final RouteInfoManager routeInfoManager; + + private RemotingServer remotingServer; + + private BrokerHousekeepingService brokerHousekeepingService; + + private ExecutorService remotingExecutor; + + private Configuration configuration; + + + public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) { + this.namesrvConfig = namesrvConfig; + this.nettyServerConfig = nettyServerConfig; + this.kvConfigManager = new KVConfigManager(this); + this.routeInfoManager = new RouteInfoManager(); + this.brokerHousekeepingService = new BrokerHousekeepingService(this); + this.configuration = new Configuration( + log, + this.namesrvConfig, this.nettyServerConfig + ); + this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath"); + } + + + public boolean initialize() { + + this.kvConfigManager.load(); + + this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService); + + + this.remotingExecutor = + Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_")); + + this.registerProcessor(); + + + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + + @Override + public void run() { + NamesrvController.this.routeInfoManager.scanNotActiveBroker(); + } + }, 5, 10, TimeUnit.SECONDS); + + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + + @Override + public void run() { + NamesrvController.this.kvConfigManager.printAllPeriodically(); + } + }, 1, 10, TimeUnit.MINUTES); + + return true; + } + + + private void registerProcessor() { + if (namesrvConfig.isClusterTest()) { + + this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()), + this.remotingExecutor); + } else { + + this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor); + } + } + + + public void start() throws Exception { + this.remotingServer.start(); + } + + + public void shutdown() { + this.remotingServer.shutdown(); + this.remotingExecutor.shutdown(); + this.scheduledExecutorService.shutdown(); + } + + + public NamesrvConfig getNamesrvConfig() { + return namesrvConfig; + } + + + public NettyServerConfig getNettyServerConfig() { + return nettyServerConfig; + } + + + public KVConfigManager getKvConfigManager() { + return kvConfigManager; + } + + + public RouteInfoManager getRouteInfoManager() { + return routeInfoManager; + } + + + public RemotingServer getRemotingServer() { + return remotingServer; + } + + + public void setRemotingServer(RemotingServer remotingServer) { + this.remotingServer = remotingServer; + } + + public Configuration getConfiguration() { + return configuration; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/NamesrvStartup.java ---------------------------------------------------------------------- diff --git a/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/NamesrvStartup.java b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/NamesrvStartup.java new file mode 100644 index 0000000..286de3a --- /dev/null +++ b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/NamesrvStartup.java @@ -0,0 +1,184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.namesrv; + +import ch.qos.logback.classic.LoggerContext; +import ch.qos.logback.classic.joran.JoranConfigurator; +import com.alibaba.rocketmq.common.MQVersion; +import com.alibaba.rocketmq.common.MixAll; +import com.alibaba.rocketmq.common.constant.LoggerName; +import com.alibaba.rocketmq.common.namesrv.NamesrvConfig; +import com.alibaba.rocketmq.remoting.netty.NettyServerConfig; +import com.alibaba.rocketmq.remoting.netty.NettySystemConfig; +import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; +import com.alibaba.rocketmq.srvutil.ServerUtil; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedInputStream; +import java.io.FileInputStream; +import java.io.InputStream; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; + + +/** + * @author shijia.wxr + */ +public class NamesrvStartup { + public static Properties properties = null; + public static CommandLine commandLine = null; + + public static void main(String[] args) { + main0(args); + } + + public static NamesrvController main0(String[] args) { + System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); + + + if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) { + NettySystemConfig.socketSndbufSize = 4096; + } + + + if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) { + NettySystemConfig.socketRcvbufSize = 4096; + } + + try { + //PackageConflictDetect.detectFastjson(); + + Options options = ServerUtil.buildCommandlineOptions(new Options()); + commandLine = + ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), + new PosixParser()); + if (null == commandLine) { + System.exit(-1); + return null; + } + + + final NamesrvConfig namesrvConfig = new NamesrvConfig(); + final NettyServerConfig nettyServerConfig = new NettyServerConfig(); + nettyServerConfig.setListenPort(9876); + if (commandLine.hasOption('c')) { + String file = commandLine.getOptionValue('c'); + if (file != null) { + InputStream in = new BufferedInputStream(new FileInputStream(file)); + properties = new Properties(); + properties.load(in); + MixAll.properties2Object(properties, namesrvConfig); + MixAll.properties2Object(properties, nettyServerConfig); + + namesrvConfig.setConfigStorePath(file); + + System.out.printf("load config properties file OK, " + file + "%n"); + in.close(); + } + } + + + if (commandLine.hasOption('p')) { + MixAll.printObjectProperties(null, namesrvConfig); + MixAll.printObjectProperties(null, nettyServerConfig); + System.exit(0); + } + + MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig); + + if (null == namesrvConfig.getRocketmqHome()) { + System.out.printf("Please set the " + MixAll.ROCKETMQ_HOME_ENV + + " variable in your environment to match the location of the RocketMQ installation%n"); + System.exit(-2); + } + + LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); + JoranConfigurator configurator = new JoranConfigurator(); + configurator.setContext(lc); + lc.reset(); + configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml"); + final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); + + + MixAll.printObjectProperties(log, namesrvConfig); + MixAll.printObjectProperties(log, nettyServerConfig); + + + final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig); + + // remember all configs to prevent discard + controller.getConfiguration().registerConfig(properties); + + boolean initResult = controller.initialize(); + if (!initResult) { + controller.shutdown(); + System.exit(-3); + } + + Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { + private volatile boolean hasShutdown = false; + private AtomicInteger shutdownTimes = new AtomicInteger(0); + + + @Override + public void run() { + synchronized (this) { + log.info("shutdown hook was invoked, " + this.shutdownTimes.incrementAndGet()); + if (!this.hasShutdown) { + this.hasShutdown = true; + long begineTime = System.currentTimeMillis(); + controller.shutdown(); + long consumingTimeTotal = System.currentTimeMillis() - begineTime; + log.info("shutdown hook over, consuming time total(ms): " + consumingTimeTotal); + } + } + } + }, "ShutdownHook")); + + + controller.start(); + + String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer(); + log.info(tip); + System.out.printf(tip + "%n"); + + return controller; + } catch (Throwable e) { + e.printStackTrace(); + System.exit(-1); + } + + return null; + } + + public static Options buildCommandlineOptions(final Options options) { + Option opt = new Option("c", "configFile", true, "Name server config properties file"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("p", "printConfigItem", false, "Print all config item"); + opt.setRequired(false); + options.addOption(opt); + + return options; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/kvconfig/KVConfigManager.java ---------------------------------------------------------------------- diff --git a/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/kvconfig/KVConfigManager.java b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/kvconfig/KVConfigManager.java new file mode 100644 index 0000000..a83586c --- /dev/null +++ b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/kvconfig/KVConfigManager.java @@ -0,0 +1,203 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.namesrv.kvconfig; + +import com.alibaba.rocketmq.common.MixAll; +import com.alibaba.rocketmq.common.constant.LoggerName; +import com.alibaba.rocketmq.common.protocol.body.KVTable; +import com.alibaba.rocketmq.namesrv.NamesrvController; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + + +/** + * @author shijia.wxr + */ +public class KVConfigManager { + private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); + + private final NamesrvController namesrvController; + + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + private final HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable = + new HashMap<String, HashMap<String, String>>(); + + + public KVConfigManager(NamesrvController namesrvController) { + this.namesrvController = namesrvController; + } + + + public void load() { + String content = MixAll.file2String(this.namesrvController.getNamesrvConfig().getKvConfigPath()); + if (content != null) { + KVConfigSerializeWrapper kvConfigSerializeWrapper = + KVConfigSerializeWrapper.fromJson(content, KVConfigSerializeWrapper.class); + if (null != kvConfigSerializeWrapper) { + this.configTable.putAll(kvConfigSerializeWrapper.getConfigTable()); + log.info("load KV config table OK"); + } + } + } + + + public void putKVConfig(final String namespace, final String key, final String value) { + try { + this.lock.writeLock().lockInterruptibly(); + try { + HashMap<String, String> kvTable = this.configTable.get(namespace); + if (null == kvTable) { + kvTable = new HashMap<String, String>(); + this.configTable.put(namespace, kvTable); + log.info("putKVConfig create new Namespace {}", namespace); + } + + final String prev = kvTable.put(key, value); + if (null != prev) { + log.info("putKVConfig update config item, Namespace: {} Key: {} Value: {}", // + namespace, key, value); + } else { + log.info("putKVConfig create new config item, Namespace: {} Key: {} Value: {}", // + namespace, key, value); + } + } finally { + this.lock.writeLock().unlock(); + } + } catch (InterruptedException e) { + log.error("putKVConfig InterruptedException", e); + } + + this.persist(); + } + + public void persist() { + try { + this.lock.readLock().lockInterruptibly(); + try { + KVConfigSerializeWrapper kvConfigSerializeWrapper = new KVConfigSerializeWrapper(); + kvConfigSerializeWrapper.setConfigTable(this.configTable); + + String content = kvConfigSerializeWrapper.toJson(); + + if (null != content) { + MixAll.string2File(content, this.namesrvController.getNamesrvConfig().getKvConfigPath()); + } + } catch (IOException e) { + log.error("persist kvconfig Exception, " + + this.namesrvController.getNamesrvConfig().getKvConfigPath(), e); + } finally { + this.lock.readLock().unlock(); + } + } catch (InterruptedException e) { + log.error("persist InterruptedException", e); + } + + } + + public void deleteKVConfig(final String namespace, final String key) { + try { + this.lock.writeLock().lockInterruptibly(); + try { + HashMap<String, String> kvTable = this.configTable.get(namespace); + if (null != kvTable) { + String value = kvTable.remove(key); + log.info("deleteKVConfig delete a config item, Namespace: {} Key: {} Value: {}", // + namespace, key, value); + } + } finally { + this.lock.writeLock().unlock(); + } + } catch (InterruptedException e) { + log.error("deleteKVConfig InterruptedException", e); + } + + this.persist(); + } + + public byte[] getKVListByNamespace(final String namespace) { + try { + this.lock.readLock().lockInterruptibly(); + try { + HashMap<String, String> kvTable = this.configTable.get(namespace); + if (null != kvTable) { + KVTable table = new KVTable(); + table.setTable(kvTable); + return table.encode(); + } + } finally { + this.lock.readLock().unlock(); + } + } catch (InterruptedException e) { + log.error("getKVListByNamespace InterruptedException", e); + } + + return null; + } + + public String getKVConfig(final String namespace, final String key) { + try { + this.lock.readLock().lockInterruptibly(); + try { + HashMap<String, String> kvTable = this.configTable.get(namespace); + if (null != kvTable) { + return kvTable.get(key); + } + } finally { + this.lock.readLock().unlock(); + } + } catch (InterruptedException e) { + log.error("getKVConfig InterruptedException", e); + } + + return null; + } + + public void printAllPeriodically() { + try { + this.lock.readLock().lockInterruptibly(); + try { + log.info("--------------------------------------------------------"); + + { + log.info("configTable SIZE: {}", this.configTable.size()); + Iterator<Entry<String, HashMap<String, String>>> it = + this.configTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<String, HashMap<String, String>> next = it.next(); + Iterator<Entry<String, String>> itSub = next.getValue().entrySet().iterator(); + while (itSub.hasNext()) { + Entry<String, String> nextSub = itSub.next(); + log.info("configTable NS: {} Key: {} Value: {}", next.getKey(), nextSub.getKey(), + nextSub.getValue()); + } + } + } + } finally { + this.lock.readLock().unlock(); + } + } catch (InterruptedException e) { + log.error("printAllPeriodically InterruptedException", e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/kvconfig/KVConfigSerializeWrapper.java ---------------------------------------------------------------------- diff --git a/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/kvconfig/KVConfigSerializeWrapper.java b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/kvconfig/KVConfigSerializeWrapper.java new file mode 100644 index 0000000..3a91028 --- /dev/null +++ b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/kvconfig/KVConfigSerializeWrapper.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.namesrv.kvconfig; + +import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable; + +import java.util.HashMap; + + +/** + * @author shijia.wxr + */ +public class KVConfigSerializeWrapper extends RemotingSerializable { + private HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable; + + + public HashMap<String, HashMap<String, String>> getConfigTable() { + return configTable; + } + + + public void setConfigTable(HashMap<String, HashMap<String, String>> configTable) { + this.configTable = configTable; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java ---------------------------------------------------------------------- diff --git a/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java new file mode 100644 index 0000000..b0b158d --- /dev/null +++ b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.namesrv.processor; + +import com.alibaba.rocketmq.client.exception.MQClientException; +import com.alibaba.rocketmq.common.constant.LoggerName; +import com.alibaba.rocketmq.common.help.FAQUrl; +import com.alibaba.rocketmq.common.namesrv.NamesrvUtil; +import com.alibaba.rocketmq.common.protocol.ResponseCode; +import com.alibaba.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader; +import com.alibaba.rocketmq.common.protocol.route.TopicRouteData; +import com.alibaba.rocketmq.namesrv.NamesrvController; +import com.alibaba.rocketmq.remoting.exception.RemotingCommandException; +import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; +import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt; +import io.netty.channel.ChannelHandlerContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * @author manhong.yqd + */ +public class ClusterTestRequestProcessor extends DefaultRequestProcessor { + private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); + private final DefaultMQAdminExt adminExt; + private final String productEnvName; + + + public ClusterTestRequestProcessor(NamesrvController namesrvController, String productEnvName) { + super(namesrvController); + this.productEnvName = productEnvName; + adminExt = new DefaultMQAdminExt(); + adminExt.setInstanceName("CLUSTER_TEST_NS_INS_" + productEnvName); + adminExt.setUnitName(productEnvName); + try { + adminExt.start(); + } catch (MQClientException e) { + e.printStackTrace(); + } + } + + + @Override + public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + final GetRouteInfoRequestHeader requestHeader = + (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class); + + TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic()); + if (topicRouteData != null) { + String orderTopicConf = + this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, + requestHeader.getTopic()); + topicRouteData.setOrderTopicConf(orderTopicConf); + } else { + try { + topicRouteData = adminExt.examineTopicRouteInfo(requestHeader.getTopic()); + } catch (Exception e) { + log.info("get route info by topic from product environment failed. envName={},", productEnvName); + } + } + + if (topicRouteData != null) { + byte[] content = topicRouteData.encode(); + response.setBody(content); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + + response.setCode(ResponseCode.TOPIC_NOT_EXIST); + response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic() + + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)); + return response; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/processor/DefaultRequestProcessor.java ---------------------------------------------------------------------- diff --git a/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/processor/DefaultRequestProcessor.java b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/processor/DefaultRequestProcessor.java new file mode 100644 index 0000000..118198e --- /dev/null +++ b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/processor/DefaultRequestProcessor.java @@ -0,0 +1,491 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.namesrv.processor; + +import com.alibaba.rocketmq.common.MQVersion; +import com.alibaba.rocketmq.common.MQVersion.Version; +import com.alibaba.rocketmq.common.MixAll; +import com.alibaba.rocketmq.common.constant.LoggerName; +import com.alibaba.rocketmq.common.help.FAQUrl; +import com.alibaba.rocketmq.common.namesrv.NamesrvUtil; +import com.alibaba.rocketmq.common.namesrv.RegisterBrokerResult; +import com.alibaba.rocketmq.common.protocol.RequestCode; +import com.alibaba.rocketmq.common.protocol.ResponseCode; +import com.alibaba.rocketmq.common.protocol.body.RegisterBrokerBody; +import com.alibaba.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; +import com.alibaba.rocketmq.common.protocol.header.GetTopicsByClusterRequestHeader; +import com.alibaba.rocketmq.common.protocol.header.namesrv.*; +import com.alibaba.rocketmq.common.protocol.route.TopicRouteData; +import com.alibaba.rocketmq.namesrv.NamesrvController; +import com.alibaba.rocketmq.remoting.common.RemotingHelper; +import com.alibaba.rocketmq.remoting.exception.RemotingCommandException; +import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor; +import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; +import io.netty.channel.ChannelHandlerContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.UnsupportedEncodingException; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicLong; + + +/** + * @author shijia.wxr + */ +public class DefaultRequestProcessor implements NettyRequestProcessor { + private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); + + protected final NamesrvController namesrvController; + + + public DefaultRequestProcessor(NamesrvController namesrvController) { + this.namesrvController = namesrvController; + } + + + @Override + public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + if (log.isDebugEnabled()) { + log.debug("receive request, {} {} {}", + request.getCode(), + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), + request); + } + + switch (request.getCode()) { + case RequestCode.PUT_KV_CONFIG: + return this.putKVConfig(ctx, request); + case RequestCode.GET_KV_CONFIG: + return this.getKVConfig(ctx, request); + case RequestCode.DELETE_KV_CONFIG: + return this.deleteKVConfig(ctx, request); + case RequestCode.REGISTER_BROKER: + Version brokerVersion = MQVersion.value2Version(request.getVersion()); + if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) { + return this.registerBrokerWithFilterServer(ctx, request); + } + else { + return this.registerBroker(ctx, request); + } + case RequestCode.UNREGISTER_BROKER: + return this.unregisterBroker(ctx, request); + case RequestCode.GET_ROUTEINTO_BY_TOPIC: + return this.getRouteInfoByTopic(ctx, request); + case RequestCode.GET_BROKER_CLUSTER_INFO: + return this.getBrokerClusterInfo(ctx, request); + case RequestCode.WIPE_WRITE_PERM_OF_BROKER: + return this.wipeWritePermOfBroker(ctx, request); + case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER: + return getAllTopicListFromNameserver(ctx, request); + case RequestCode.DELETE_TOPIC_IN_NAMESRV: + return deleteTopicInNamesrv(ctx, request); + case RequestCode.GET_KVLIST_BY_NAMESPACE: + return this.getKVListByNamespace(ctx, request); + case RequestCode.GET_TOPICS_BY_CLUSTER: + return this.getTopicsByCluster(ctx, request); + case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS: + return this.getSystemTopicListFromNs(ctx, request); + case RequestCode.GET_UNIT_TOPIC_LIST: + return this.getUnitTopicList(ctx, request); + case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST: + return this.getHasUnitSubTopicList(ctx, request); + case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST: + return this.getHasUnitSubUnUnitTopicList(ctx, request); + case RequestCode.UPDATE_NAMESRV_CONFIG: + return this.updateConfig(ctx, request); + case RequestCode.GET_NAMESRV_CONFIG: + return this.getConfig(ctx, request); + default: + break; + } + return null; + } + + @Override + public boolean rejectRequest() { + return false; + } + + public RemotingCommand putKVConfig(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + final PutKVConfigRequestHeader requestHeader = + (PutKVConfigRequestHeader) request.decodeCommandCustomHeader(PutKVConfigRequestHeader.class); + + this.namesrvController.getKvConfigManager().putKVConfig( + requestHeader.getNamespace(), + requestHeader.getKey(), + requestHeader.getValue() + ); + + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + + public RemotingCommand getKVConfig(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(GetKVConfigResponseHeader.class); + final GetKVConfigResponseHeader responseHeader = (GetKVConfigResponseHeader) response.readCustomHeader(); + final GetKVConfigRequestHeader requestHeader = + (GetKVConfigRequestHeader) request.decodeCommandCustomHeader(GetKVConfigRequestHeader.class); + + String value = this.namesrvController.getKvConfigManager().getKVConfig( + requestHeader.getNamespace(), + requestHeader.getKey() + ); + + if (value != null) { + responseHeader.setValue(value); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + + response.setCode(ResponseCode.QUERY_NOT_FOUND); + response.setRemark("No config item, Namespace: " + requestHeader.getNamespace() + " Key: " + requestHeader.getKey()); + return response; + } + + public RemotingCommand deleteKVConfig(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + final DeleteKVConfigRequestHeader requestHeader = + (DeleteKVConfigRequestHeader) request.decodeCommandCustomHeader(DeleteKVConfigRequestHeader.class); + + this.namesrvController.getKvConfigManager().deleteKVConfig( + requestHeader.getNamespace(), + requestHeader.getKey() + ); + + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + + public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, RemotingCommand request) + throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class); + final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader(); + final RegisterBrokerRequestHeader requestHeader = + (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class); + + RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody(); + + if (request.getBody() != null) { + registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), RegisterBrokerBody.class); + } else { + registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setCounter(new AtomicLong(0)); + registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestatmp(0); + } + + RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker( + requestHeader.getClusterName(), + requestHeader.getBrokerAddr(), + requestHeader.getBrokerName(), + requestHeader.getBrokerId(), + requestHeader.getHaServerAddr(), + registerBrokerBody.getTopicConfigSerializeWrapper(), + registerBrokerBody.getFilterServerList(), + ctx.channel()); + + responseHeader.setHaServerAddr(result.getHaServerAddr()); + responseHeader.setMasterAddr(result.getMasterAddr()); + + + byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG); + response.setBody(jsonValue); + + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + + public RemotingCommand registerBroker(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class); + final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader(); + final RegisterBrokerRequestHeader requestHeader = + (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class); + + TopicConfigSerializeWrapper topicConfigWrapper; + if (request.getBody() != null) { + topicConfigWrapper = TopicConfigSerializeWrapper.decode(request.getBody(), TopicConfigSerializeWrapper.class); + } else { + topicConfigWrapper = new TopicConfigSerializeWrapper(); + topicConfigWrapper.getDataVersion().setCounter(new AtomicLong(0)); + topicConfigWrapper.getDataVersion().setTimestatmp(0); + } + + RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker( + requestHeader.getClusterName(), + requestHeader.getBrokerAddr(), + requestHeader.getBrokerName(), + requestHeader.getBrokerId(), + requestHeader.getHaServerAddr(), + topicConfigWrapper, + null, + ctx.channel() + ); + + responseHeader.setHaServerAddr(result.getHaServerAddr()); + responseHeader.setMasterAddr(result.getMasterAddr()); + + + byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG); + response.setBody(jsonValue); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + + public RemotingCommand unregisterBroker(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + final UnRegisterBrokerRequestHeader requestHeader = + (UnRegisterBrokerRequestHeader) request.decodeCommandCustomHeader(UnRegisterBrokerRequestHeader.class); + + this.namesrvController.getRouteInfoManager().unregisterBroker( + requestHeader.getClusterName(), + requestHeader.getBrokerAddr(), + requestHeader.getBrokerName(), + requestHeader.getBrokerId()); + + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + + public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + final GetRouteInfoRequestHeader requestHeader = + (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class); + + TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic()); + + if (topicRouteData != null) { + if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) { + String orderTopicConf = + this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, + requestHeader.getTopic()); + topicRouteData.setOrderTopicConf(orderTopicConf); + } + + byte[] content = topicRouteData.encode(); + response.setBody(content); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + + response.setCode(ResponseCode.TOPIC_NOT_EXIST); + response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic() + + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)); + return response; + } + + private RemotingCommand getBrokerClusterInfo(ChannelHandlerContext ctx, RemotingCommand request) { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + + byte[] content = this.namesrvController.getRouteInfoManager().getAllClusterInfo(); + response.setBody(content); + + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + + private RemotingCommand wipeWritePermOfBroker(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(WipeWritePermOfBrokerResponseHeader.class); + final WipeWritePermOfBrokerResponseHeader responseHeader = (WipeWritePermOfBrokerResponseHeader) response.readCustomHeader(); + final WipeWritePermOfBrokerRequestHeader requestHeader = + (WipeWritePermOfBrokerRequestHeader) request.decodeCommandCustomHeader(WipeWritePermOfBrokerRequestHeader.class); + + int wipeTopicCnt = this.namesrvController.getRouteInfoManager().wipeWritePermOfBrokerByLock(requestHeader.getBrokerName()); + + log.info("wipe write perm of broker[{}], client: {}, {}", + requestHeader.getBrokerName(), + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), + wipeTopicCnt); + + responseHeader.setWipeTopicCount(wipeTopicCnt); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + + private RemotingCommand getAllTopicListFromNameserver(ChannelHandlerContext ctx, RemotingCommand request) { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + + byte[] body = this.namesrvController.getRouteInfoManager().getAllTopicList(); + + response.setBody(body); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + + private RemotingCommand deleteTopicInNamesrv(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + final DeleteTopicInNamesrvRequestHeader requestHeader = + (DeleteTopicInNamesrvRequestHeader) request.decodeCommandCustomHeader(DeleteTopicInNamesrvRequestHeader.class); + + this.namesrvController.getRouteInfoManager().deleteTopic(requestHeader.getTopic()); + + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + + private RemotingCommand getKVListByNamespace(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + final GetKVListByNamespaceRequestHeader requestHeader = + (GetKVListByNamespaceRequestHeader) request.decodeCommandCustomHeader(GetKVListByNamespaceRequestHeader.class); + + byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace( + requestHeader.getNamespace()); + if (null != jsonValue) { + response.setBody(jsonValue); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + + response.setCode(ResponseCode.QUERY_NOT_FOUND); + response.setRemark("No config item, Namespace: " + requestHeader.getNamespace()); + return response; + } + + private RemotingCommand getTopicsByCluster(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + final GetTopicsByClusterRequestHeader requestHeader = + (GetTopicsByClusterRequestHeader) request.decodeCommandCustomHeader(GetTopicsByClusterRequestHeader.class); + + byte[] body = this.namesrvController.getRouteInfoManager().getTopicsByCluster(requestHeader.getCluster()); + + response.setBody(body); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + + + private RemotingCommand getSystemTopicListFromNs(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + + byte[] body = this.namesrvController.getRouteInfoManager().getSystemTopicList(); + + response.setBody(body); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + + + private RemotingCommand getUnitTopicList(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + + byte[] body = this.namesrvController.getRouteInfoManager().getUnitTopics(); + + response.setBody(body); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + + + private RemotingCommand getHasUnitSubTopicList(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + + byte[] body = this.namesrvController.getRouteInfoManager().getHasUnitSubTopicList(); + + response.setBody(body); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + + + private RemotingCommand getHasUnitSubUnUnitTopicList(ChannelHandlerContext ctx, RemotingCommand request) + throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + + byte[] body = this.namesrvController.getRouteInfoManager().getHasUnitSubUnUnitTopicList(); + + response.setBody(body); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + + private RemotingCommand updateConfig(ChannelHandlerContext ctx, RemotingCommand request) { + log.info("updateConfig called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel())); + + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + + byte[] body = request.getBody(); + if (body != null) { + String bodyStr; + try { + bodyStr = new String(body, MixAll.DEFAULT_CHARSET); + } catch (UnsupportedEncodingException e) { + log.error("updateConfig byte array to string error: ", e); + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("UnsupportedEncodingException " + e); + return response; + } + + if (bodyStr == null) { + log.error("updateConfig get null body!"); + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("string2Properties error"); + return response; + } + + Properties properties = MixAll.string2Properties(bodyStr); + if (properties == null) { + log.error("updateConfig MixAll.string2Properties error {}", bodyStr); + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("string2Properties error"); + return response; + } + + this.namesrvController.getConfiguration().update(properties); + } + + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + + private RemotingCommand getConfig(ChannelHandlerContext ctx, RemotingCommand request) { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + + String content = this.namesrvController.getConfiguration().getAllConfigsFormatString(); + if (content != null && content.length() > 0) { + try { + response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET)); + } catch (UnsupportedEncodingException e) { + log.error("getConfig error, ", e); + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("UnsupportedEncodingException " + e); + return response; + } + } + + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/routeinfo/BrokerHousekeepingService.java ---------------------------------------------------------------------- diff --git a/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/routeinfo/BrokerHousekeepingService.java b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/routeinfo/BrokerHousekeepingService.java new file mode 100644 index 0000000..2f123fb --- /dev/null +++ b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/routeinfo/BrokerHousekeepingService.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.namesrv.routeinfo; + +import com.alibaba.rocketmq.common.constant.LoggerName; +import com.alibaba.rocketmq.namesrv.NamesrvController; +import com.alibaba.rocketmq.remoting.ChannelEventListener; +import io.netty.channel.Channel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * @author shijia.wxr + */ +public class BrokerHousekeepingService implements ChannelEventListener { + private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); + private final NamesrvController namesrvController; + + + public BrokerHousekeepingService(NamesrvController namesrvController) { + this.namesrvController = namesrvController; + } + + + @Override + public void onChannelConnect(String remoteAddr, Channel channel) { + } + + + @Override + public void onChannelClose(String remoteAddr, Channel channel) { + this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel); + } + + + @Override + public void onChannelException(String remoteAddr, Channel channel) { + this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel); + } + + + @Override + public void onChannelIdle(String remoteAddr, Channel channel) { + this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel); + } +}
