http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/stats/FilterServerStatsManager.java ---------------------------------------------------------------------- diff --git a/filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/stats/FilterServerStatsManager.java b/filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/stats/FilterServerStatsManager.java deleted file mode 100644 index 3921c92..0000000 --- a/filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/stats/FilterServerStatsManager.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.filtersrv.stats; - -import com.alibaba.rocketmq.common.ThreadFactoryImpl; -import com.alibaba.rocketmq.common.constant.LoggerName; -import com.alibaba.rocketmq.common.stats.StatsItemSet; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; - - -public class FilterServerStatsManager { - private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME); - private final ScheduledExecutorService scheduledExecutorService = Executors - .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FSStatsThread")); - - // ConsumerGroup Get Nums - private final StatsItemSet groupGetNums = new StatsItemSet("GROUP_GET_NUMS", - this.scheduledExecutorService, log); - - // ConsumerGroup Get Size - private final StatsItemSet groupGetSize = new StatsItemSet("GROUP_GET_SIZE", - this.scheduledExecutorService, log); - - - public FilterServerStatsManager() { - } - - - public void start() { - } - - - public void shutdown() { - this.scheduledExecutorService.shutdown(); - } - - - public void incGroupGetNums(final String group, final String topic, final int incValue) { - this.groupGetNums.addValue(topic + "@" + group, incValue, 1); - } - - - public void incGroupGetSize(final String group, final String topic, final int incValue) { - this.groupGetSize.addValue(topic + "@" + group, incValue, 1); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FilterServerOuterAPI.java ---------------------------------------------------------------------- diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FilterServerOuterAPI.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FilterServerOuterAPI.java new file mode 100644 index 0000000..1663dfc --- /dev/null +++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FilterServerOuterAPI.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.filtersrv; + +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.common.protocol.RequestCode; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerRequestHeader; +import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerResponseHeader; +import org.apache.rocketmq.remoting.RemotingClient; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.remoting.netty.NettyClientConfig; +import org.apache.rocketmq.remoting.netty.NettyRemotingClient; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; + + +/** + * @author shijia.wxr + */ +public class FilterServerOuterAPI { + private final RemotingClient remotingClient; + + + public FilterServerOuterAPI() { + this.remotingClient = new NettyRemotingClient(new NettyClientConfig()); + } + + + public void start() { + this.remotingClient.start(); + } + + + public void shutdown() { + this.remotingClient.shutdown(); + } + + + public RegisterFilterServerResponseHeader registerFilterServerToBroker( + final String brokerAddr, + final String filterServerAddr + ) throws RemotingCommandException, RemotingConnectException, RemotingSendRequestException, + RemotingTimeoutException, InterruptedException, MQBrokerException { + RegisterFilterServerRequestHeader requestHeader = new RegisterFilterServerRequestHeader(); + requestHeader.setFilterServerAddr(filterServerAddr); + RemotingCommand request = + RemotingCommand.createRequestCommand(RequestCode.REGISTER_FILTER_SERVER, requestHeader); + + RemotingCommand response = this.remotingClient.invokeSync(brokerAddr, request, 3000); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + RegisterFilterServerResponseHeader responseHeader = + (RegisterFilterServerResponseHeader) response + .decodeCommandCustomHeader(RegisterFilterServerResponseHeader.class); + + return responseHeader; + } + default: + break; + } + + throw new MQBrokerException(response.getCode(), response.getRemark()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvConfig.java ---------------------------------------------------------------------- diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvConfig.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvConfig.java new file mode 100644 index 0000000..ec0381d --- /dev/null +++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvConfig.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.filtersrv; + +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.annotation.ImportantField; +import org.apache.rocketmq.remoting.common.RemotingUtil; + + +public class FiltersrvConfig { + private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, + System.getenv(MixAll.ROCKETMQ_HOME_ENV)); + + @ImportantField + private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, + System.getenv(MixAll.NAMESRV_ADDR_ENV)); + + private String connectWhichBroker = "127.0.0.1:10911"; + private String filterServerIP = RemotingUtil.getLocalAddress(); + + private int compressMsgBodyOverHowmuch = 1024 * 8; + private int zipCompressLevel = 5; + + + private boolean clientUploadFilterClassEnable = true; + + + private String filterClassRepertoryUrl = "http://fsrep.tbsite.net/filterclass"; + + private int fsServerAsyncSemaphoreValue = 2048; + private int fsServerCallbackExecutorThreads = 64; + private int fsServerWorkerThreads = 64; + + + public String getRocketmqHome() { + return rocketmqHome; + } + + + public void setRocketmqHome(String rocketmqHome) { + this.rocketmqHome = rocketmqHome; + } + + + public String getNamesrvAddr() { + return namesrvAddr; + } + + + public void setNamesrvAddr(String namesrvAddr) { + this.namesrvAddr = namesrvAddr; + } + + + public String getConnectWhichBroker() { + return connectWhichBroker; + } + + + public void setConnectWhichBroker(String connectWhichBroker) { + this.connectWhichBroker = connectWhichBroker; + } + + + public String getFilterServerIP() { + return filterServerIP; + } + + + public void setFilterServerIP(String filterServerIP) { + this.filterServerIP = filterServerIP; + } + + + public int getCompressMsgBodyOverHowmuch() { + return compressMsgBodyOverHowmuch; + } + + + public void setCompressMsgBodyOverHowmuch(int compressMsgBodyOverHowmuch) { + this.compressMsgBodyOverHowmuch = compressMsgBodyOverHowmuch; + } + + + public int getZipCompressLevel() { + return zipCompressLevel; + } + + + public void setZipCompressLevel(int zipCompressLevel) { + this.zipCompressLevel = zipCompressLevel; + } + + + public boolean isClientUploadFilterClassEnable() { + return clientUploadFilterClassEnable; + } + + + public void setClientUploadFilterClassEnable(boolean clientUploadFilterClassEnable) { + this.clientUploadFilterClassEnable = clientUploadFilterClassEnable; + } + + + public String getFilterClassRepertoryUrl() { + return filterClassRepertoryUrl; + } + + + public void setFilterClassRepertoryUrl(String filterClassRepertoryUrl) { + this.filterClassRepertoryUrl = filterClassRepertoryUrl; + } + + + public int getFsServerAsyncSemaphoreValue() { + return fsServerAsyncSemaphoreValue; + } + + + public void setFsServerAsyncSemaphoreValue(int fsServerAsyncSemaphoreValue) { + this.fsServerAsyncSemaphoreValue = fsServerAsyncSemaphoreValue; + } + + + public int getFsServerCallbackExecutorThreads() { + return fsServerCallbackExecutorThreads; + } + + + public void setFsServerCallbackExecutorThreads(int fsServerCallbackExecutorThreads) { + this.fsServerCallbackExecutorThreads = fsServerCallbackExecutorThreads; + } + + + public int getFsServerWorkerThreads() { + return fsServerWorkerThreads; + } + + + public void setFsServerWorkerThreads(int fsServerWorkerThreads) { + this.fsServerWorkerThreads = fsServerWorkerThreads; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvController.java ---------------------------------------------------------------------- diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvController.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvController.java new file mode 100644 index 0000000..cb862a6 --- /dev/null +++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvController.java @@ -0,0 +1,226 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.filtersrv; + +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.ThreadFactoryImpl; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerResponseHeader; +import org.apache.rocketmq.filtersrv.filter.FilterClassManager; +import org.apache.rocketmq.filtersrv.processor.DefaultRequestProcessor; +import org.apache.rocketmq.filtersrv.stats.FilterServerStatsManager; +import org.apache.rocketmq.remoting.RemotingServer; +import org.apache.rocketmq.remoting.netty.NettyRemotingServer; +import org.apache.rocketmq.remoting.netty.NettyServerConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + + +/** + * @author shijia.wxr + */ +public class FiltersrvController { + private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME); + + private final FiltersrvConfig filtersrvConfig; + + private final NettyServerConfig nettyServerConfig; + private final FilterClassManager filterClassManager; + + private final FilterServerOuterAPI filterServerOuterAPI = new FilterServerOuterAPI(); + private final DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer( + MixAll.FILTERSRV_CONSUMER_GROUP); + + private final ScheduledExecutorService scheduledExecutorService = Executors + .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FSScheduledThread")); + private final FilterServerStatsManager filterServerStatsManager = new FilterServerStatsManager(); + + private RemotingServer remotingServer; + + private ExecutorService remotingExecutor; + private volatile String brokerName = null; + + + public FiltersrvController(FiltersrvConfig filtersrvConfig, NettyServerConfig nettyServerConfig) { + this.filtersrvConfig = filtersrvConfig; + this.nettyServerConfig = nettyServerConfig; + this.filterClassManager = new FilterClassManager(this); + } + + + public boolean initialize() { + + MixAll.printObjectProperties(log, this.filtersrvConfig); + + + this.remotingServer = new NettyRemotingServer(this.nettyServerConfig); + + + this.remotingExecutor = + Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), + new ThreadFactoryImpl("RemotingExecutorThread_")); + + this.registerProcessor(); + + + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + + @Override + public void run() { + FiltersrvController.this.registerFilterServerToBroker(); + } + }, 3, 10, TimeUnit.SECONDS); + + this.defaultMQPullConsumer.setBrokerSuspendMaxTimeMillis(this.defaultMQPullConsumer + .getBrokerSuspendMaxTimeMillis() - 1000); + this.defaultMQPullConsumer.setConsumerTimeoutMillisWhenSuspend(this.defaultMQPullConsumer + .getConsumerTimeoutMillisWhenSuspend() - 1000); + + this.defaultMQPullConsumer.setNamesrvAddr(this.filtersrvConfig.getNamesrvAddr()); + this.defaultMQPullConsumer.setInstanceName(String.valueOf(UtilAll.getPid())); + + return true; + } + + private void registerProcessor() { + this.remotingServer + .registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor); + } + + public void registerFilterServerToBroker() { + try { + RegisterFilterServerResponseHeader responseHeader = + this.filterServerOuterAPI.registerFilterServerToBroker( + this.filtersrvConfig.getConnectWhichBroker(), this.localAddr()); + this.defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper() + .setDefaultBrokerId(responseHeader.getBrokerId()); + + if (null == this.brokerName) { + this.brokerName = responseHeader.getBrokerName(); + } + + log.info("register filter server<{}> to broker<{}> OK, Return: {} {}", + this.localAddr(), + this.filtersrvConfig.getConnectWhichBroker(), + responseHeader.getBrokerName(), + responseHeader.getBrokerId()); + } catch (Exception e) { + log.warn("register filter server Exception", e); + + log.warn("access broker failed, kill oneself"); + System.exit(-1); + } + } + + public String localAddr() { + return String.format("%s:%d", this.filtersrvConfig.getFilterServerIP(), + this.remotingServer.localListenPort()); + } + + public void start() throws Exception { + this.defaultMQPullConsumer.start(); + this.remotingServer.start(); + this.filterServerOuterAPI.start(); + this.defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper() + .setConnectBrokerByUser(true); + this.filterClassManager.start(); + this.filterServerStatsManager.start(); + } + + + public void shutdown() { + this.remotingServer.shutdown(); + this.remotingExecutor.shutdown(); + this.scheduledExecutorService.shutdown(); + this.defaultMQPullConsumer.shutdown(); + this.filterServerOuterAPI.shutdown(); + this.filterClassManager.shutdown(); + this.filterServerStatsManager.shutdown(); + } + + + public RemotingServer getRemotingServer() { + return remotingServer; + } + + + public void setRemotingServer(RemotingServer remotingServer) { + this.remotingServer = remotingServer; + } + + + public ExecutorService getRemotingExecutor() { + return remotingExecutor; + } + + + public void setRemotingExecutor(ExecutorService remotingExecutor) { + this.remotingExecutor = remotingExecutor; + } + + + public FiltersrvConfig getFiltersrvConfig() { + return filtersrvConfig; + } + + + public NettyServerConfig getNettyServerConfig() { + return nettyServerConfig; + } + + + public ScheduledExecutorService getScheduledExecutorService() { + return scheduledExecutorService; + } + + + public FilterServerOuterAPI getFilterServerOuterAPI() { + return filterServerOuterAPI; + } + + + public FilterClassManager getFilterClassManager() { + return filterClassManager; + } + + + public DefaultMQPullConsumer getDefaultMQPullConsumer() { + return defaultMQPullConsumer; + } + + + public String getBrokerName() { + return brokerName; + } + + + public void setBrokerName(String brokerName) { + this.brokerName = brokerName; + } + + + public FilterServerStatsManager getFilterServerStatsManager() { + return filterServerStatsManager; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvStartup.java ---------------------------------------------------------------------- diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvStartup.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvStartup.java new file mode 100644 index 0000000..4e1fbc4 --- /dev/null +++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvStartup.java @@ -0,0 +1,183 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.filtersrv; + +import ch.qos.logback.classic.LoggerContext; +import ch.qos.logback.classic.joran.JoranConfigurator; +import org.apache.rocketmq.common.MQVersion; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.remoting.netty.NettyServerConfig; +import org.apache.rocketmq.remoting.netty.NettySystemConfig; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedInputStream; +import java.io.FileInputStream; +import java.io.InputStream; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; + + +/** + * @author shijia.wxr + */ +public class FiltersrvStartup { + public static Logger log; + + public static void main(String[] args) { + start(createController(args)); + } + + public static FiltersrvController start(FiltersrvController controller) { + + try { + controller.start(); + } catch (Exception e) { + e.printStackTrace(); + System.exit(-1); + } + + String tip = "The Filter Server boot success, " + controller.localAddr(); + log.info(tip); + System.out.printf("%s%n", tip); + + return controller; + } + + public static FiltersrvController createController(String[] args) { + System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); + + + if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) { + NettySystemConfig.socketSndbufSize = 65535; + } + + + if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) { + NettySystemConfig.socketRcvbufSize = 1024; + } + + try { + Options options = ServerUtil.buildCommandlineOptions(new Options()); + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqfiltersrv", args, buildCommandlineOptions(options), + new PosixParser()); + if (null == commandLine) { + System.exit(-1); + return null; + } + + final FiltersrvConfig filtersrvConfig = new FiltersrvConfig(); + final NettyServerConfig nettyServerConfig = new NettyServerConfig(); + + if (commandLine.hasOption('c')) { + String file = commandLine.getOptionValue('c'); + if (file != null) { + InputStream in = new BufferedInputStream(new FileInputStream(file)); + Properties properties = new Properties(); + properties.load(in); + MixAll.properties2Object(properties, filtersrvConfig); + System.out.printf("load config properties file OK, " + file + "%n"); + in.close(); + + String port = properties.getProperty("listenPort"); + if (port != null) { + filtersrvConfig.setConnectWhichBroker(String.format("127.0.0.1:%s", port)); + } + } + } + + nettyServerConfig.setListenPort(0); + nettyServerConfig.setServerAsyncSemaphoreValue(filtersrvConfig.getFsServerAsyncSemaphoreValue()); + nettyServerConfig.setServerCallbackExecutorThreads(filtersrvConfig + .getFsServerCallbackExecutorThreads()); + nettyServerConfig.setServerWorkerThreads(filtersrvConfig.getFsServerWorkerThreads()); + + if (commandLine.hasOption('p')) { + MixAll.printObjectProperties(null, filtersrvConfig); + MixAll.printObjectProperties(null, nettyServerConfig); + System.exit(0); + } + + MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), filtersrvConfig); + if (null == filtersrvConfig.getRocketmqHome()) { + System.out.printf("Please set the " + MixAll.ROCKETMQ_HOME_ENV + + " variable in your environment to match the location of the RocketMQ installation%n"); + System.exit(-2); + } + + LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); + JoranConfigurator configurator = new JoranConfigurator(); + configurator.setContext(lc); + lc.reset(); + configurator.doConfigure(filtersrvConfig.getRocketmqHome() + "/conf/logback_filtersrv.xml"); + log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME); + + final FiltersrvController controller = + new FiltersrvController(filtersrvConfig, nettyServerConfig); + boolean initResult = controller.initialize(); + if (!initResult) { + controller.shutdown(); + System.exit(-3); + } + + Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { + private volatile boolean hasShutdown = false; + private AtomicInteger shutdownTimes = new AtomicInteger(0); + + @Override + public void run() { + synchronized (this) { + log.info("shutdown hook was invoked, " + this.shutdownTimes.incrementAndGet()); + if (!this.hasShutdown) { + this.hasShutdown = true; + long begineTime = System.currentTimeMillis(); + controller.shutdown(); + long consumingTimeTotal = System.currentTimeMillis() - begineTime; + log.info("shutdown hook over, consuming time total(ms): " + consumingTimeTotal); + } + } + } + }, "ShutdownHook")); + + return controller; + } catch (Throwable e) { + e.printStackTrace(); + System.exit(-1); + } + return null; + } + + public static Options buildCommandlineOptions(final Options options) { + Option opt = new Option("c", "configFile", true, "Filter server config properties file"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("p", "printConfigItem", false, "Print all config item"); + opt.setRequired(false); + options.addOption(opt); + + return options; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/DynaCode.java ---------------------------------------------------------------------- diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/DynaCode.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/DynaCode.java new file mode 100644 index 0000000..fd95685 --- /dev/null +++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/DynaCode.java @@ -0,0 +1,393 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.filtersrv.filter; + +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.filter.FilterAPI; +import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.tools.JavaCompiler; +import javax.tools.ToolProvider; +import java.io.*; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; +import java.net.URLDecoder; +import java.util.*; + + +public class DynaCode { + private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME); + + private static final String FILE_SP = System.getProperty("file.separator"); + + private static final String LINE_SP = System.getProperty("line.separator"); + + private String sourcePath = System.getProperty("user.home") + FILE_SP + "rocketmq_filter_class" + FILE_SP + + UtilAll.getPid(); + + private String outPutClassPath = sourcePath; + + + private ClassLoader parentClassLoader; + + + private List<String> codeStrs; + + + private Map<String/* fullClassName */, Class<?>/* class */> loadClass; + + + private String classpath; + + + private String bootclasspath; + + + private String extdirs; + + + private String encoding = "UTF-8"; + + + private String target; + + + @SuppressWarnings("unchecked") + public DynaCode(String code) { + this(Thread.currentThread().getContextClassLoader(), Arrays.asList(code)); + } + + + public DynaCode(ClassLoader parentClassLoader, List<String> codeStrs) { + this(extractClasspath(parentClassLoader), parentClassLoader, codeStrs); + } + + + public DynaCode(String classpath, ClassLoader parentClassLoader, List<String> codeStrs) { + this.classpath = classpath; + this.parentClassLoader = parentClassLoader; + this.codeStrs = codeStrs; + this.loadClass = new HashMap<String, Class<?>>(codeStrs.size()); + } + + + private static String extractClasspath(ClassLoader cl) { + StringBuffer buf = new StringBuffer(); + while (cl != null) { + if (cl instanceof URLClassLoader) { + URL urls[] = ((URLClassLoader) cl).getURLs(); + for (int i = 0; i < urls.length; i++) { + if (buf.length() > 0) { + buf.append(File.pathSeparatorChar); + } + String s = urls[i].getFile(); + try { + s = URLDecoder.decode(s, "UTF-8"); + } catch (UnsupportedEncodingException e) { + continue; + } + File f = new File(s); + buf.append(f.getAbsolutePath()); + } + } + cl = cl.getParent(); + } + return buf.toString(); + } + + + public DynaCode(List<String> codeStrs) { + this(Thread.currentThread().getContextClassLoader(), codeStrs); + } + + public static Class<?> compileAndLoadClass(final String className, final String javaSource) + throws Exception { + String classSimpleName = FilterAPI.simpleClassName(className); + String javaCode = javaSource; + + final String newClassSimpleName = classSimpleName + System.currentTimeMillis(); + String newJavaCode = javaCode.replaceAll(classSimpleName, newClassSimpleName); + + List<String> codes = new ArrayList<String>(); + codes.add(newJavaCode); + DynaCode dc = new DynaCode(codes); + dc.compileAndLoadClass(); + Map<String, Class<?>> map = dc.getLoadClass(); + + Class<?> clazz = map.get(getQualifiedName(newJavaCode)); + return clazz; + } + + public void compileAndLoadClass() throws Exception { + String[] sourceFiles = this.uploadSrcFile(); + this.compile(sourceFiles); + this.loadClass(this.loadClass.keySet()); + } + + public Map<String, Class<?>> getLoadClass() { + return loadClass; + } + + public static String getQualifiedName(String code) { + StringBuilder sb = new StringBuilder(); + String className = getClassName(code); + if (StringUtils.isNotBlank(className)) { + + String packageName = getPackageName(code); + if (StringUtils.isNotBlank(packageName)) { + sb.append(packageName).append("."); + } + sb.append(className); + } + return sb.toString(); + } + + private String[] uploadSrcFile() throws Exception { + List<String> srcFileAbsolutePaths = new ArrayList<String>(codeStrs.size()); + for (String code : codeStrs) { + if (StringUtils.isNotBlank(code)) { + String packageName = getPackageName(code); + String className = getClassName(code); + if (StringUtils.isNotBlank(className)) { + File srcFile = null; + BufferedWriter bufferWriter = null; + try { + if (StringUtils.isBlank(packageName)) { + File pathFile = new File(sourcePath); + + if (!pathFile.exists()) { + if (!pathFile.mkdirs()) { + throw new RuntimeException("create PathFile Error!"); + } + } + srcFile = new File(sourcePath + FILE_SP + className + ".java"); + } else { + String srcPath = StringUtils.replace(packageName, ".", FILE_SP); + File pathFile = new File(sourcePath + FILE_SP + srcPath); + + if (!pathFile.exists()) { + if (!pathFile.mkdirs()) { + throw new RuntimeException("create PathFile Error!"); + } + } + srcFile = new File(pathFile.getAbsolutePath() + FILE_SP + className + ".java"); + } + synchronized (loadClass) { + loadClass.put(getFullClassName(code), null); + } + if (null != srcFile) { + LOGGER.warn("Dyna Create Java Source File:---->" + srcFile.getAbsolutePath()); + srcFileAbsolutePaths.add(srcFile.getAbsolutePath()); + srcFile.deleteOnExit(); + } + OutputStreamWriter outputStreamWriter = + new OutputStreamWriter(new FileOutputStream(srcFile), encoding); + bufferWriter = new BufferedWriter(outputStreamWriter); + for (String lineCode : code.split(LINE_SP)) { + bufferWriter.write(lineCode); + bufferWriter.newLine(); + } + bufferWriter.flush(); + } finally { + if (null != bufferWriter) { + bufferWriter.close(); + } + } + } + } + } + return srcFileAbsolutePaths.toArray(new String[srcFileAbsolutePaths.size()]); + } + + private void compile(String[] srcFiles) throws Exception { + String args[] = this.buildCompileJavacArgs(srcFiles); + ByteArrayOutputStream err = new ByteArrayOutputStream(); + JavaCompiler compiler = ToolProvider.getSystemJavaCompiler(); + if (compiler == null) { + throw new NullPointerException( + "ToolProvider.getSystemJavaCompiler() return null,please use JDK replace JRE!"); + } + int resultCode = compiler.run(null, null, err, args); + if (resultCode != 0) { + throw new Exception(err.toString(RemotingHelper.DEFAULT_CHARSET)); + } + } + + private void loadClass(Set<String> classFullNames) throws ClassNotFoundException, MalformedURLException { + synchronized (loadClass) { + ClassLoader classLoader = + new URLClassLoader(new URL[]{new File(outPutClassPath).toURI().toURL()}, + parentClassLoader); + for (String key : classFullNames) { + Class<?> classz = classLoader.loadClass(key); + if (null != classz) { + loadClass.put(key, classz); + LOGGER.info("Dyna Load Java Class File OK:----> className: " + key); + } else { + LOGGER.error("Dyna Load Java Class File Fail:----> className: " + key); + } + } + } + } + + public static String getClassName(String code) { + String className = StringUtils.substringBefore(code, "{"); + if (StringUtils.isBlank(className)) { + return className; + } + if (StringUtils.contains(code, " class ")) { + className = StringUtils.substringAfter(className, " class "); + if (StringUtils.contains(className, " extends ")) { + className = StringUtils.substringBefore(className, " extends ").trim(); + } else if (StringUtils.contains(className, " implements ")) { + className = StringUtils.trim(StringUtils.substringBefore(className, " implements ")); + } else { + className = StringUtils.trim(className); + } + } else if (StringUtils.contains(code, " interface ")) { + className = StringUtils.substringAfter(className, " interface "); + if (StringUtils.contains(className, " extends ")) { + className = StringUtils.substringBefore(className, " extends ").trim(); + } else { + className = StringUtils.trim(className); + } + } else if (StringUtils.contains(code, " enum ")) { + className = StringUtils.trim(StringUtils.substringAfter(className, " enum ")); + } else { + return StringUtils.EMPTY; + } + return className; + } + + public static String getPackageName(String code) { + String packageName = + StringUtils.substringBefore(StringUtils.substringAfter(code, "package "), ";").trim(); + return packageName; + } + + public static String getFullClassName(String code) { + String packageName = getPackageName(code); + String className = getClassName(code); + return StringUtils.isBlank(packageName) ? className : packageName + "." + className; + } + + private String[] buildCompileJavacArgs(String srcFiles[]) { + ArrayList<String> args = new ArrayList<String>(); + if (StringUtils.isNotBlank(classpath)) { + args.add("-classpath"); + args.add(classpath); + } + if (StringUtils.isNotBlank(outPutClassPath)) { + args.add("-d"); + args.add(outPutClassPath); + } + if (StringUtils.isNotBlank(sourcePath)) { + args.add("-sourcepath"); + args.add(sourcePath); + } + if (StringUtils.isNotBlank(bootclasspath)) { + args.add("-bootclasspath"); + args.add(bootclasspath); + } + if (StringUtils.isNotBlank(extdirs)) { + args.add("-extdirs"); + args.add(extdirs); + } + if (StringUtils.isNotBlank(encoding)) { + args.add("-encoding"); + args.add(encoding); + } + if (StringUtils.isNotBlank(target)) { + args.add("-target"); + args.add(target); + } + for (int i = 0; i < srcFiles.length; i++) { + args.add(srcFiles[i]); + } + return args.toArray(new String[args.size()]); + } + + public String getOutPutClassPath() { + return outPutClassPath; + } + + public void setOutPutClassPath(String outPutClassPath) { + this.outPutClassPath = outPutClassPath; + } + + public String getSourcePath() { + return sourcePath; + } + + public void setSourcePath(String sourcePath) { + this.sourcePath = sourcePath; + } + + public ClassLoader getParentClassLoader() { + return parentClassLoader; + } + + public void setParentClassLoader(ClassLoader parentClassLoader) { + this.parentClassLoader = parentClassLoader; + } + + public String getClasspath() { + return classpath; + } + + public void setClasspath(String classpath) { + this.classpath = classpath; + } + + public String getBootclasspath() { + return bootclasspath; + } + + public void setBootclasspath(String bootclasspath) { + this.bootclasspath = bootclasspath; + } + + public String getExtdirs() { + return extdirs; + } + + public void setExtdirs(String extdirs) { + this.extdirs = extdirs; + } + + public String getEncoding() { + return encoding; + } + + public void setEncoding(String encoding) { + this.encoding = encoding; + } + + public String getTarget() { + return target; + } + + public void setTarget(String target) { + this.target = target; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassFetchMethod.java ---------------------------------------------------------------------- diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassFetchMethod.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassFetchMethod.java new file mode 100644 index 0000000..36d6b7e --- /dev/null +++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassFetchMethod.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.filtersrv.filter; + +public interface FilterClassFetchMethod { + public String fetch(final String topic, final String consumerGroup, final String className); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassInfo.java ---------------------------------------------------------------------- diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassInfo.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassInfo.java new file mode 100644 index 0000000..d278fe3 --- /dev/null +++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassInfo.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.filtersrv.filter; + +import org.apache.rocketmq.common.filter.MessageFilter; + + +public class FilterClassInfo { + private String className; + private int classCRC; + private MessageFilter messageFilter; + + + public int getClassCRC() { + return classCRC; + } + + + public void setClassCRC(int classCRC) { + this.classCRC = classCRC; + } + + + public MessageFilter getMessageFilter() { + return messageFilter; + } + + + public void setMessageFilter(MessageFilter messageFilter) { + this.messageFilter = messageFilter; + } + + + public String getClassName() { + return className; + } + + + public void setClassName(String className) { + this.className = className; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassLoader.java ---------------------------------------------------------------------- diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassLoader.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassLoader.java new file mode 100644 index 0000000..3269852 --- /dev/null +++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassLoader.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.filtersrv.filter; + +public class FilterClassLoader extends ClassLoader { + public final Class<?> createNewClass(String name, byte[] b, int off, int len) throws ClassFormatError { + return this.defineClass(name, b, off, len); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassManager.java ---------------------------------------------------------------------- diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassManager.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassManager.java new file mode 100644 index 0000000..fab4d7d --- /dev/null +++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassManager.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.filtersrv.filter; + +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.ThreadFactoryImpl; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.filter.MessageFilter; +import org.apache.rocketmq.filtersrv.FiltersrvController; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + + +public class FilterClassManager { + private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME); + + private final Object compileLock = new Object(); + private final FiltersrvController filtersrvController; + + private final ScheduledExecutorService scheduledExecutorService = Executors + .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FSGetClassScheduledThread")); + private ConcurrentHashMap<String/* topic@consumerGroup */, FilterClassInfo> filterClassTable = + new ConcurrentHashMap<String, FilterClassInfo>(128); + private FilterClassFetchMethod filterClassFetchMethod; + + + public FilterClassManager(FiltersrvController filtersrvController) { + this.filtersrvController = filtersrvController; + this.filterClassFetchMethod = + new HttpFilterClassFetchMethod(this.filtersrvController.getFiltersrvConfig() + .getFilterClassRepertoryUrl()); + } + + + public void start() { + if (!this.filtersrvController.getFiltersrvConfig().isClientUploadFilterClassEnable()) { + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + + @Override + public void run() { + fetchClassFromRemoteHost(); + } + }, 1, 1, TimeUnit.MINUTES); + } + } + + private void fetchClassFromRemoteHost() { + Iterator<Entry<String, FilterClassInfo>> it = this.filterClassTable.entrySet().iterator(); + while (it.hasNext()) { + try { + Entry<String, FilterClassInfo> next = it.next(); + FilterClassInfo filterClassInfo = next.getValue(); + String[] topicAndGroup = next.getKey().split("@"); + String responseStr = + this.filterClassFetchMethod.fetch(topicAndGroup[0], topicAndGroup[1], + filterClassInfo.getClassName()); + byte[] filterSourceBinary = responseStr.getBytes("UTF-8"); + int classCRC = UtilAll.crc32(responseStr.getBytes("UTF-8")); + if (classCRC != filterClassInfo.getClassCRC()) { + String javaSource = new String(filterSourceBinary, MixAll.DEFAULT_CHARSET); + Class<?> newClass = + DynaCode.compileAndLoadClass(filterClassInfo.getClassName(), javaSource); + Object newInstance = newClass.newInstance(); + filterClassInfo.setMessageFilter((MessageFilter) newInstance); + filterClassInfo.setClassCRC(classCRC); + + log.info("fetch Remote class File OK, {} {}", next.getKey(), + filterClassInfo.getClassName()); + } + } catch (Exception e) { + log.error("fetchClassFromRemoteHost Exception", e); + } + } + } + + public void shutdown() { + this.scheduledExecutorService.shutdown(); + } + + public boolean registerFilterClass(final String consumerGroup, final String topic, + final String className, final int classCRC, final byte[] filterSourceBinary) { + final String key = buildKey(consumerGroup, topic); + + + boolean registerNew = false; + FilterClassInfo filterClassInfoPrev = this.filterClassTable.get(key); + if (null == filterClassInfoPrev) { + registerNew = true; + } else { + if (this.filtersrvController.getFiltersrvConfig().isClientUploadFilterClassEnable()) { + if (filterClassInfoPrev.getClassCRC() != classCRC && classCRC != 0) { + registerNew = true; + } + } + } + + if (registerNew) { + synchronized (this.compileLock) { + filterClassInfoPrev = this.filterClassTable.get(key); + if (null != filterClassInfoPrev && filterClassInfoPrev.getClassCRC() == classCRC) { + return true; + } + + try { + + FilterClassInfo filterClassInfoNew = new FilterClassInfo(); + filterClassInfoNew.setClassName(className); + filterClassInfoNew.setClassCRC(0); + filterClassInfoNew.setMessageFilter(null); + + if (this.filtersrvController.getFiltersrvConfig().isClientUploadFilterClassEnable()) { + String javaSource = new String(filterSourceBinary, MixAll.DEFAULT_CHARSET); + Class<?> newClass = DynaCode.compileAndLoadClass(className, javaSource); + Object newInstance = newClass.newInstance(); + filterClassInfoNew.setMessageFilter((MessageFilter) newInstance); + filterClassInfoNew.setClassCRC(classCRC); + } + + this.filterClassTable.put(key, filterClassInfoNew); + } catch (Throwable e) { + String info = + String + .format( + "FilterServer, registerFilterClass Exception, consumerGroup: %s topic: %s className: %s", + consumerGroup, topic, className); + log.error(info, e); + return false; + } + } + } + + return true; + } + + private static String buildKey(final String consumerGroup, final String topic) { + return topic + "@" + consumerGroup; + } + + public FilterClassInfo findFilterClass(final String consumerGroup, final String topic) { + return this.filterClassTable.get(buildKey(consumerGroup, topic)); + } + + + public FilterClassFetchMethod getFilterClassFetchMethod() { + return filterClassFetchMethod; + } + + + public void setFilterClassFetchMethod(FilterClassFetchMethod filterClassFetchMethod) { + this.filterClassFetchMethod = filterClassFetchMethod; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/HttpFilterClassFetchMethod.java ---------------------------------------------------------------------- diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/HttpFilterClassFetchMethod.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/HttpFilterClassFetchMethod.java new file mode 100644 index 0000000..c8b1515 --- /dev/null +++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/HttpFilterClassFetchMethod.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.filtersrv.filter; + +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.utils.HttpTinyClient; +import org.apache.rocketmq.common.utils.HttpTinyClient.HttpResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class HttpFilterClassFetchMethod implements FilterClassFetchMethod { + private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME); + private final String url; + + + public HttpFilterClassFetchMethod(String url) { + this.url = url; + } + + + @Override + public String fetch(String topic, String consumerGroup, String className) { + String thisUrl = String.format("%s/%s.java", this.url, className); + + try { + HttpResult result = HttpTinyClient.httpGet(thisUrl, null, null, "UTF-8", 5000); + if (200 == result.code) { + return result.content; + } + } catch (Exception e) { + log.error( + String.format("call <%s> exception, Topic: %s Group: %s", thisUrl, topic, consumerGroup), e); + } + + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/processor/DefaultRequestProcessor.java ---------------------------------------------------------------------- diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/processor/DefaultRequestProcessor.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/processor/DefaultRequestProcessor.java new file mode 100644 index 0000000..5553952 --- /dev/null +++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/processor/DefaultRequestProcessor.java @@ -0,0 +1,355 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.filtersrv.processor; + +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.client.consumer.PullCallback; +import org.apache.rocketmq.client.consumer.PullResult; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.filter.FilterContext; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.RequestCode; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader; +import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader; +import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterMessageFilterClassRequestHeader; +import org.apache.rocketmq.common.sysflag.MessageSysFlag; +import org.apache.rocketmq.filtersrv.FiltersrvController; +import org.apache.rocketmq.filtersrv.filter.FilterClassInfo; +import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.store.CommitLog; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + + +/** + * @author shijia.wxr + */ +public class DefaultRequestProcessor implements NettyRequestProcessor { + private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME); + + private final FiltersrvController filtersrvController; + + + public DefaultRequestProcessor(FiltersrvController filtersrvController) { + this.filtersrvController = filtersrvController; + } + + + @Override + public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception { + if (log.isDebugEnabled()) { + log.debug("receive request, {} {} {}", + request.getCode(), + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), + request); + } + + switch (request.getCode()) { + case RequestCode.REGISTER_MESSAGE_FILTER_CLASS: + return registerMessageFilterClass(ctx, request); + case RequestCode.PULL_MESSAGE: + return pullMessageForward(ctx, request); + } + + return null; + } + + @Override + public boolean rejectRequest() { + return false; + } + + private RemotingCommand registerMessageFilterClass(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + final RegisterMessageFilterClassRequestHeader requestHeader = + (RegisterMessageFilterClassRequestHeader) request.decodeCommandCustomHeader(RegisterMessageFilterClassRequestHeader.class); + + try { + boolean ok = this.filtersrvController.getFilterClassManager().registerFilterClass(requestHeader.getConsumerGroup(), + requestHeader.getTopic(), + requestHeader.getClassName(), + requestHeader.getClassCRC(), + request.getBody()); + if (!ok) { + throw new Exception("registerFilterClass error"); + } + } catch (Exception e) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark(RemotingHelper.exceptionSimpleDesc(e)); + return response; + } + + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + + private RemotingCommand pullMessageForward(final ChannelHandlerContext ctx, final RemotingCommand request) throws Exception { + final RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class); + final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader(); + final PullMessageRequestHeader requestHeader = + (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class); + + final FilterContext filterContext = new FilterContext(); + filterContext.setConsumerGroup(requestHeader.getConsumerGroup()); + + + response.setOpaque(request.getOpaque()); + + DefaultMQPullConsumer pullConsumer = this.filtersrvController.getDefaultMQPullConsumer(); + final FilterClassInfo findFilterClass = + this.filtersrvController.getFilterClassManager() + .findFilterClass(requestHeader.getConsumerGroup(), requestHeader.getTopic()); + if (null == findFilterClass) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("Find Filter class failed, not registered"); + return response; + } + + if (null == findFilterClass.getMessageFilter()) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("Find Filter class failed, registered but no class"); + return response; + } + + responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID); + + + MessageQueue mq = new MessageQueue(); + mq.setTopic(requestHeader.getTopic()); + mq.setQueueId(requestHeader.getQueueId()); + mq.setBrokerName(this.filtersrvController.getBrokerName()); + long offset = requestHeader.getQueueOffset(); + int maxNums = requestHeader.getMaxMsgNums(); + + final PullCallback pullCallback = new PullCallback() { + + @Override + public void onSuccess(PullResult pullResult) { + responseHeader.setMaxOffset(pullResult.getMaxOffset()); + responseHeader.setMinOffset(pullResult.getMinOffset()); + responseHeader.setNextBeginOffset(pullResult.getNextBeginOffset()); + response.setRemark(null); + + switch (pullResult.getPullStatus()) { + case FOUND: + response.setCode(ResponseCode.SUCCESS); + + List<MessageExt> msgListOK = new ArrayList<MessageExt>(); + try { + for (MessageExt msg : pullResult.getMsgFoundList()) { + boolean match = findFilterClass.getMessageFilter().match(msg, filterContext); + if (match) { + msgListOK.add(msg); + } + } + + + if (!msgListOK.isEmpty()) { + returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, msgListOK); + return; + } else { + response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); + } + } catch (Throwable e) { + final String error = + String.format("do Message Filter Exception, ConsumerGroup: %s Topic: %s ", + requestHeader.getConsumerGroup(), requestHeader.getTopic()); + log.error(error, e); + + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark(error + RemotingHelper.exceptionSimpleDesc(e)); + returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null); + return; + } + + break; + case NO_MATCHED_MSG: + response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); + break; + case NO_NEW_MSG: + response.setCode(ResponseCode.PULL_NOT_FOUND); + break; + case OFFSET_ILLEGAL: + response.setCode(ResponseCode.PULL_OFFSET_MOVED); + break; + default: + break; + } + + returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null); + } + + + @Override + public void onException(Throwable e) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("Pull Callback Exception, " + RemotingHelper.exceptionSimpleDesc(e)); + returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null); + return; + } + }; + + pullConsumer.pullBlockIfNotFound(mq, null, offset, maxNums, pullCallback); + + return null; + } + + private void returnResponse(final String group, final String topic, ChannelHandlerContext ctx, final RemotingCommand response, + final List<MessageExt> msgList) { + if (null != msgList) { + ByteBuffer[] msgBufferList = new ByteBuffer[msgList.size()]; + int bodyTotalSize = 0; + for (int i = 0; i < msgList.size(); i++) { + try { + msgBufferList[i] = messageToByteBuffer(msgList.get(i)); + bodyTotalSize += msgBufferList[i].capacity(); + } catch (Exception e) { + log.error("messageToByteBuffer UnsupportedEncodingException", e); + } + } + + ByteBuffer body = ByteBuffer.allocate(bodyTotalSize); + for (ByteBuffer bb : msgBufferList) { + bb.flip(); + body.put(bb); + } + + response.setBody(body.array()); + + + this.filtersrvController.getFilterServerStatsManager().incGroupGetNums(group, topic, msgList.size()); + + this.filtersrvController.getFilterServerStatsManager().incGroupGetSize(group, topic, bodyTotalSize); + } + + try { + ctx.writeAndFlush(response).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + log.error("FilterServer response to " + future.channel().remoteAddress() + " failed", future.cause()); + log.error(response.toString()); + } + } + }); + } catch (Throwable e) { + log.error("FilterServer process request over, but response failed", e); + log.error(response.toString()); + } + } + + private ByteBuffer messageToByteBuffer(final MessageExt msg) throws IOException { + int sysFlag = MessageSysFlag.clearCompressedFlag(msg.getSysFlag()); + if (msg.getBody() != null) { + if (msg.getBody().length >= this.filtersrvController.getFiltersrvConfig().getCompressMsgBodyOverHowmuch()) { + byte[] data = UtilAll.compress(msg.getBody(), this.filtersrvController.getFiltersrvConfig().getZipCompressLevel()); + if (data != null) { + msg.setBody(data); + sysFlag |= MessageSysFlag.COMPRESSED_FLAG; + } + } + } + + final int bodyLength = msg.getBody() != null ? msg.getBody().length : 0; + byte[] topicData = msg.getTopic().getBytes(MixAll.DEFAULT_CHARSET); + final int topicLength = topicData.length; + String properties = MessageDecoder.messageProperties2String(msg.getProperties()); + byte[] propertiesData = properties.getBytes(MixAll.DEFAULT_CHARSET); + final int propertiesLength = propertiesData.length; + final int msgLen = 4 // 1 TOTALSIZE + + 4 // 2 MAGICCODE + + 4 // 3 BODYCRC + + 4 // 4 QUEUEID + + 4 // 5 FLAG + + 8 // 6 QUEUEOFFSET + + 8 // 7 PHYSICALOFFSET + + 4 // 8 SYSFLAG + + 8 // 9 BORNTIMESTAMP + + 8 // 10 BORNHOST + + 8 // 11 STORETIMESTAMP + + 8 // 12 STOREHOSTADDRESS + + 4 // 13 RECONSUMETIMES + + 8 // 14 Prepared Transaction Offset + + 4 + bodyLength // 14 BODY + + 1 + topicLength // 15 TOPIC + + 2 + propertiesLength // 16 propertiesLength + + 0; + + ByteBuffer msgStoreItemMemory = ByteBuffer.allocate(msgLen); + + final MessageExt msgInner = msg; + + // 1 TOTALSIZE + msgStoreItemMemory.putInt(msgLen); + // 2 MAGICCODE + msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE); + // 3 BODYCRC + msgStoreItemMemory.putInt(UtilAll.crc32(msgInner.getBody())); + // 4 QUEUEID + msgStoreItemMemory.putInt(msgInner.getQueueId()); + // 5 FLAG + msgStoreItemMemory.putInt(msgInner.getFlag()); + // 6 QUEUEOFFSET + msgStoreItemMemory.putLong(msgInner.getQueueOffset()); + // 7 PHYSICALOFFSET + msgStoreItemMemory.putLong(msgInner.getCommitLogOffset()); + // 8 SYSFLAG + msgStoreItemMemory.putInt(sysFlag); + // 9 BORNTIMESTAMP + msgStoreItemMemory.putLong(msgInner.getBornTimestamp()); + // 10 BORNHOST + msgStoreItemMemory.put(msgInner.getBornHostBytes()); + // 11 STORETIMESTAMP + msgStoreItemMemory.putLong(msgInner.getStoreTimestamp()); + // 12 STOREHOSTADDRESS + msgStoreItemMemory.put(msgInner.getStoreHostBytes()); + // 13 RECONSUMETIMES + msgStoreItemMemory.putInt(msgInner.getReconsumeTimes()); + // 14 Prepared Transaction Offset + msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset()); + // 15 BODY + msgStoreItemMemory.putInt(bodyLength); + if (bodyLength > 0) + msgStoreItemMemory.put(msgInner.getBody()); + // 16 TOPIC + msgStoreItemMemory.put((byte) topicLength); + msgStoreItemMemory.put(topicData); + // 17 PROPERTIES + msgStoreItemMemory.putShort((short) propertiesLength); + if (propertiesLength > 0) + msgStoreItemMemory.put(propertiesData); + + return msgStoreItemMemory; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/stats/FilterServerStatsManager.java ---------------------------------------------------------------------- diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/stats/FilterServerStatsManager.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/stats/FilterServerStatsManager.java new file mode 100644 index 0000000..8665fbd --- /dev/null +++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/stats/FilterServerStatsManager.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.filtersrv.stats; + +import org.apache.rocketmq.common.ThreadFactoryImpl; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.stats.StatsItemSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + + +public class FilterServerStatsManager { + private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME); + private final ScheduledExecutorService scheduledExecutorService = Executors + .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FSStatsThread")); + + // ConsumerGroup Get Nums + private final StatsItemSet groupGetNums = new StatsItemSet("GROUP_GET_NUMS", + this.scheduledExecutorService, log); + + // ConsumerGroup Get Size + private final StatsItemSet groupGetSize = new StatsItemSet("GROUP_GET_SIZE", + this.scheduledExecutorService, log); + + + public FilterServerStatsManager() { + } + + + public void start() { + } + + + public void shutdown() { + this.scheduledExecutorService.shutdown(); + } + + + public void incGroupGetNums(final String group, final String topic, final int incValue) { + this.groupGetNums.addValue(topic + "@" + group, incValue, 1); + } + + + public void incGroupGetSize(final String group, final String topic, final int incValue) { + this.groupGetSize.addValue(topic + "@" + group, incValue, 1); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/namesrv/pom.xml ---------------------------------------------------------------------- diff --git a/namesrv/pom.xml b/namesrv/pom.xml index 3494f8f..2ec2f5f 100644 --- a/namesrv/pom.xml +++ b/namesrv/pom.xml @@ -18,7 +18,7 @@ <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> - <groupId>com.alibaba.rocketmq</groupId> + <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-all</artifactId> <version>4.0.0-SNAPSHOT</version> </parent> @@ -35,11 +35,11 @@ <scope>test</scope> </dependency> <dependency> - <groupId>com.alibaba.rocketmq</groupId> + <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> </dependency> <dependency> - <groupId>com.alibaba.rocketmq</groupId> + <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-tools</artifactId> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/namesrv/src/main/java/com/alibaba/rocketmq/namesrv/NamesrvController.java ---------------------------------------------------------------------- diff --git a/namesrv/src/main/java/com/alibaba/rocketmq/namesrv/NamesrvController.java b/namesrv/src/main/java/com/alibaba/rocketmq/namesrv/NamesrvController.java deleted file mode 100644 index 82f2622..0000000 --- a/namesrv/src/main/java/com/alibaba/rocketmq/namesrv/NamesrvController.java +++ /dev/null @@ -1,167 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.namesrv; - -import com.alibaba.rocketmq.common.Configuration; -import com.alibaba.rocketmq.common.ThreadFactoryImpl; -import com.alibaba.rocketmq.common.constant.LoggerName; -import com.alibaba.rocketmq.common.namesrv.NamesrvConfig; -import com.alibaba.rocketmq.namesrv.kvconfig.KVConfigManager; -import com.alibaba.rocketmq.namesrv.processor.ClusterTestRequestProcessor; -import com.alibaba.rocketmq.namesrv.processor.DefaultRequestProcessor; -import com.alibaba.rocketmq.namesrv.routeinfo.BrokerHousekeepingService; -import com.alibaba.rocketmq.namesrv.routeinfo.RouteInfoManager; -import com.alibaba.rocketmq.remoting.RemotingServer; -import com.alibaba.rocketmq.remoting.netty.NettyRemotingServer; -import com.alibaba.rocketmq.remoting.netty.NettyServerConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - - -/** - * @author shijia.wxr - */ -public class NamesrvController { - private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); - - private final NamesrvConfig namesrvConfig; - - private final NettyServerConfig nettyServerConfig; - - private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl( - "NSScheduledThread")); - private final KVConfigManager kvConfigManager; - private final RouteInfoManager routeInfoManager; - - private RemotingServer remotingServer; - - private BrokerHousekeepingService brokerHousekeepingService; - - private ExecutorService remotingExecutor; - - private Configuration configuration; - - - public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) { - this.namesrvConfig = namesrvConfig; - this.nettyServerConfig = nettyServerConfig; - this.kvConfigManager = new KVConfigManager(this); - this.routeInfoManager = new RouteInfoManager(); - this.brokerHousekeepingService = new BrokerHousekeepingService(this); - this.configuration = new Configuration( - log, - this.namesrvConfig, this.nettyServerConfig - ); - this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath"); - } - - - public boolean initialize() { - - this.kvConfigManager.load(); - - this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService); - - - this.remotingExecutor = - Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_")); - - this.registerProcessor(); - - - this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { - - @Override - public void run() { - NamesrvController.this.routeInfoManager.scanNotActiveBroker(); - } - }, 5, 10, TimeUnit.SECONDS); - - this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { - - @Override - public void run() { - NamesrvController.this.kvConfigManager.printAllPeriodically(); - } - }, 1, 10, TimeUnit.MINUTES); - - return true; - } - - - private void registerProcessor() { - if (namesrvConfig.isClusterTest()) { - - this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()), - this.remotingExecutor); - } else { - - this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor); - } - } - - - public void start() throws Exception { - this.remotingServer.start(); - } - - - public void shutdown() { - this.remotingServer.shutdown(); - this.remotingExecutor.shutdown(); - this.scheduledExecutorService.shutdown(); - } - - - public NamesrvConfig getNamesrvConfig() { - return namesrvConfig; - } - - - public NettyServerConfig getNettyServerConfig() { - return nettyServerConfig; - } - - - public KVConfigManager getKvConfigManager() { - return kvConfigManager; - } - - - public RouteInfoManager getRouteInfoManager() { - return routeInfoManager; - } - - - public RemotingServer getRemotingServer() { - return remotingServer; - } - - - public void setRemotingServer(RemotingServer remotingServer) { - this.remotingServer = remotingServer; - } - - public Configuration getConfiguration() { - return configuration; - } -}
