http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/org/apache/rocketmq/example/simple/RandomAsyncCommit.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/RandomAsyncCommit.java b/example/src/main/java/org/apache/rocketmq/example/simple/RandomAsyncCommit.java new file mode 100644 index 0000000..cda7952 --- /dev/null +++ b/example/src/main/java/org/apache/rocketmq/example/simple/RandomAsyncCommit.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.example.simple; + +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; + +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + + +public class RandomAsyncCommit { + private final ConcurrentHashMap<MessageQueue, CachedQueue> mqCachedTable = + new ConcurrentHashMap<MessageQueue, CachedQueue>(); + + + public void putMessages(final MessageQueue mq, final List<MessageExt> msgs) { + CachedQueue cachedQueue = this.mqCachedTable.get(mq); + if (null == cachedQueue) { + cachedQueue = new CachedQueue(); + this.mqCachedTable.put(mq, cachedQueue); + } + for (MessageExt msg : msgs) { + cachedQueue.getMsgCachedTable().put(msg.getQueueOffset(), msg); + } + } + + + public void removeMessage(final MessageQueue mq, long offset) { + CachedQueue cachedQueue = this.mqCachedTable.get(mq); + if (null != cachedQueue) { + cachedQueue.getMsgCachedTable().remove(offset); + } + } + + + public long commitableOffset(final MessageQueue mq) { + CachedQueue cachedQueue = this.mqCachedTable.get(mq); + if (null != cachedQueue) { + return cachedQueue.getMsgCachedTable().firstKey(); + } + + return -1; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/org/apache/rocketmq/example/simple/TestProducer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/TestProducer.java b/example/src/main/java/org/apache/rocketmq/example/simple/TestProducer.java new file mode 100644 index 0000000..0304a63 --- /dev/null +++ b/example/src/main/java/org/apache/rocketmq/example/simple/TestProducer.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.example.simple; + +import org.apache.rocketmq.client.QueryResult; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.remoting.common.RemotingHelper; + +public class TestProducer { + public static void main(String[] args) throws MQClientException, InterruptedException { + DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); + producer.start(); + + for (int i = 0; i < 1; i++) + try { + { + Message msg = new Message("TopicTest1", + "TagA", + "key113", + "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); + SendResult sendResult = producer.send(msg); + System.out.printf("%s%n", sendResult); + + QueryResult queryMessage = + producer.queryMessage("TopicTest1", "key113", 10, 0, System.currentTimeMillis()); + for (MessageExt m : queryMessage.getMessageList()) { + System.out.printf("%s%n", m); + } + } + + } catch (Exception e) { + e.printStackTrace(); + } + producer.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionCheckListenerImpl.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionCheckListenerImpl.java b/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionCheckListenerImpl.java new file mode 100644 index 0000000..fea93a8 --- /dev/null +++ b/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionCheckListenerImpl.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.example.transaction; + +import org.apache.rocketmq.client.producer.LocalTransactionState; +import org.apache.rocketmq.client.producer.TransactionCheckListener; +import org.apache.rocketmq.common.message.MessageExt; + +import java.util.concurrent.atomic.AtomicInteger; + + +public class TransactionCheckListenerImpl implements TransactionCheckListener { + private AtomicInteger transactionIndex = new AtomicInteger(0); + + + @Override + public LocalTransactionState checkLocalTransactionState(MessageExt msg) { + System.out.printf("server checking TrMsg " + msg.toString() + "%n"); + + int value = transactionIndex.getAndIncrement(); + if ((value % 6) == 0) { + throw new RuntimeException("Could not find db"); + } else if ((value % 5) == 0) { + return LocalTransactionState.ROLLBACK_MESSAGE; + } else if ((value % 4) == 0) { + return LocalTransactionState.COMMIT_MESSAGE; + } + + return LocalTransactionState.UNKNOW; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionExecuterImpl.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionExecuterImpl.java b/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionExecuterImpl.java new file mode 100644 index 0000000..eb787fd --- /dev/null +++ b/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionExecuterImpl.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.example.transaction; + +import org.apache.rocketmq.client.producer.LocalTransactionExecuter; +import org.apache.rocketmq.client.producer.LocalTransactionState; +import org.apache.rocketmq.common.message.Message; + +import java.util.concurrent.atomic.AtomicInteger; + +public class TransactionExecuterImpl implements LocalTransactionExecuter { + private AtomicInteger transactionIndex = new AtomicInteger(1); + + + @Override + public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) { + int value = transactionIndex.getAndIncrement(); + + if (value == 0) { + throw new RuntimeException("Could not find db"); + } else if ((value % 5) == 0) { + return LocalTransactionState.ROLLBACK_MESSAGE; + } else if ((value % 4) == 0) { + return LocalTransactionState.COMMIT_MESSAGE; + } + + return LocalTransactionState.UNKNOW; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java b/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java new file mode 100644 index 0000000..5a868c6 --- /dev/null +++ b/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.example.transaction; + +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.TransactionCheckListener; +import org.apache.rocketmq.client.producer.TransactionMQProducer; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.common.RemotingHelper; + +import java.io.UnsupportedEncodingException; + +public class TransactionProducer { + public static void main(String[] args) throws MQClientException, InterruptedException { + TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl(); + TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name"); + producer.setCheckThreadPoolMinSize(2); + producer.setCheckThreadPoolMaxSize(2); + producer.setCheckRequestHoldMax(2000); + producer.setTransactionCheckListener(transactionCheckListener); + producer.start(); + + String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"}; + TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl(); + for (int i = 0; i < 100; i++) { + try { + Message msg = + new Message("TopicTest", tags[i % tags.length], "KEY" + i, + ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); + SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null); + System.out.printf("%s%n", sendResult); + + Thread.sleep(10); + } catch (MQClientException e) { + e.printStackTrace(); + } catch (UnsupportedEncodingException e) { + e.printStackTrace(); + } + } + + for (int i = 0; i < 100000; i++) { + Thread.sleep(1000); + } + producer.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/resources/MessageFilterImpl.java ---------------------------------------------------------------------- diff --git a/example/src/main/resources/MessageFilterImpl.java b/example/src/main/resources/MessageFilterImpl.java index 2894a55..3ff3f48 100644 --- a/example/src/main/resources/MessageFilterImpl.java +++ b/example/src/main/resources/MessageFilterImpl.java @@ -15,10 +15,10 @@ * limitations under the License. */ -package com.alibaba.rocketmq.example.filter; +package org.apache.rocketmq.example.filter; -import com.alibaba.rocketmq.common.filter.MessageFilter; -import com.alibaba.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.filter.MessageFilter; +import org.apache.rocketmq.common.message.MessageExt; public class MessageFilterImpl implements MessageFilter { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/filtersrv/pom.xml ---------------------------------------------------------------------- diff --git a/filtersrv/pom.xml b/filtersrv/pom.xml index 28c360b..bebd10a 100644 --- a/filtersrv/pom.xml +++ b/filtersrv/pom.xml @@ -18,7 +18,7 @@ <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> - <groupId>com.alibaba.rocketmq</groupId> + <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-all</artifactId> <version>4.0.0-SNAPSHOT</version> </parent> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/FilterServerOuterAPI.java ---------------------------------------------------------------------- diff --git a/filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/FilterServerOuterAPI.java b/filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/FilterServerOuterAPI.java deleted file mode 100644 index b469b3f..0000000 --- a/filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/FilterServerOuterAPI.java +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.filtersrv; - -import com.alibaba.rocketmq.client.exception.MQBrokerException; -import com.alibaba.rocketmq.common.protocol.RequestCode; -import com.alibaba.rocketmq.common.protocol.ResponseCode; -import com.alibaba.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerResponseHeader; -import com.alibaba.rocketmq.remoting.RemotingClient; -import com.alibaba.rocketmq.remoting.exception.RemotingCommandException; -import com.alibaba.rocketmq.remoting.exception.RemotingConnectException; -import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException; -import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException; -import com.alibaba.rocketmq.remoting.netty.NettyClientConfig; -import com.alibaba.rocketmq.remoting.netty.NettyRemotingClient; -import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; - - -/** - * @author shijia.wxr - */ -public class FilterServerOuterAPI { - private final RemotingClient remotingClient; - - - public FilterServerOuterAPI() { - this.remotingClient = new NettyRemotingClient(new NettyClientConfig()); - } - - - public void start() { - this.remotingClient.start(); - } - - - public void shutdown() { - this.remotingClient.shutdown(); - } - - - public RegisterFilterServerResponseHeader registerFilterServerToBroker( - final String brokerAddr, - final String filterServerAddr - ) throws RemotingCommandException, RemotingConnectException, RemotingSendRequestException, - RemotingTimeoutException, InterruptedException, MQBrokerException { - RegisterFilterServerRequestHeader requestHeader = new RegisterFilterServerRequestHeader(); - requestHeader.setFilterServerAddr(filterServerAddr); - RemotingCommand request = - RemotingCommand.createRequestCommand(RequestCode.REGISTER_FILTER_SERVER, requestHeader); - - RemotingCommand response = this.remotingClient.invokeSync(brokerAddr, request, 3000); - assert response != null; - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - RegisterFilterServerResponseHeader responseHeader = - (RegisterFilterServerResponseHeader) response - .decodeCommandCustomHeader(RegisterFilterServerResponseHeader.class); - - return responseHeader; - } - default: - break; - } - - throw new MQBrokerException(response.getCode(), response.getRemark()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/FiltersrvConfig.java ---------------------------------------------------------------------- diff --git a/filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/FiltersrvConfig.java b/filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/FiltersrvConfig.java deleted file mode 100644 index fac620f..0000000 --- a/filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/FiltersrvConfig.java +++ /dev/null @@ -1,158 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.filtersrv; - -import com.alibaba.rocketmq.common.MixAll; -import com.alibaba.rocketmq.common.annotation.ImportantField; -import com.alibaba.rocketmq.remoting.common.RemotingUtil; - - -public class FiltersrvConfig { - private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, - System.getenv(MixAll.ROCKETMQ_HOME_ENV)); - - @ImportantField - private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, - System.getenv(MixAll.NAMESRV_ADDR_ENV)); - - private String connectWhichBroker = "127.0.0.1:10911"; - private String filterServerIP = RemotingUtil.getLocalAddress(); - - private int compressMsgBodyOverHowmuch = 1024 * 8; - private int zipCompressLevel = 5; - - - private boolean clientUploadFilterClassEnable = true; - - - private String filterClassRepertoryUrl = "http://fsrep.tbsite.net/filterclass"; - - private int fsServerAsyncSemaphoreValue = 2048; - private int fsServerCallbackExecutorThreads = 64; - private int fsServerWorkerThreads = 64; - - - public String getRocketmqHome() { - return rocketmqHome; - } - - - public void setRocketmqHome(String rocketmqHome) { - this.rocketmqHome = rocketmqHome; - } - - - public String getNamesrvAddr() { - return namesrvAddr; - } - - - public void setNamesrvAddr(String namesrvAddr) { - this.namesrvAddr = namesrvAddr; - } - - - public String getConnectWhichBroker() { - return connectWhichBroker; - } - - - public void setConnectWhichBroker(String connectWhichBroker) { - this.connectWhichBroker = connectWhichBroker; - } - - - public String getFilterServerIP() { - return filterServerIP; - } - - - public void setFilterServerIP(String filterServerIP) { - this.filterServerIP = filterServerIP; - } - - - public int getCompressMsgBodyOverHowmuch() { - return compressMsgBodyOverHowmuch; - } - - - public void setCompressMsgBodyOverHowmuch(int compressMsgBodyOverHowmuch) { - this.compressMsgBodyOverHowmuch = compressMsgBodyOverHowmuch; - } - - - public int getZipCompressLevel() { - return zipCompressLevel; - } - - - public void setZipCompressLevel(int zipCompressLevel) { - this.zipCompressLevel = zipCompressLevel; - } - - - public boolean isClientUploadFilterClassEnable() { - return clientUploadFilterClassEnable; - } - - - public void setClientUploadFilterClassEnable(boolean clientUploadFilterClassEnable) { - this.clientUploadFilterClassEnable = clientUploadFilterClassEnable; - } - - - public String getFilterClassRepertoryUrl() { - return filterClassRepertoryUrl; - } - - - public void setFilterClassRepertoryUrl(String filterClassRepertoryUrl) { - this.filterClassRepertoryUrl = filterClassRepertoryUrl; - } - - - public int getFsServerAsyncSemaphoreValue() { - return fsServerAsyncSemaphoreValue; - } - - - public void setFsServerAsyncSemaphoreValue(int fsServerAsyncSemaphoreValue) { - this.fsServerAsyncSemaphoreValue = fsServerAsyncSemaphoreValue; - } - - - public int getFsServerCallbackExecutorThreads() { - return fsServerCallbackExecutorThreads; - } - - - public void setFsServerCallbackExecutorThreads(int fsServerCallbackExecutorThreads) { - this.fsServerCallbackExecutorThreads = fsServerCallbackExecutorThreads; - } - - - public int getFsServerWorkerThreads() { - return fsServerWorkerThreads; - } - - - public void setFsServerWorkerThreads(int fsServerWorkerThreads) { - this.fsServerWorkerThreads = fsServerWorkerThreads; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/FiltersrvController.java ---------------------------------------------------------------------- diff --git a/filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/FiltersrvController.java b/filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/FiltersrvController.java deleted file mode 100644 index 0e3f696..0000000 --- a/filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/FiltersrvController.java +++ /dev/null @@ -1,226 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.filtersrv; - -import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer; -import com.alibaba.rocketmq.common.MixAll; -import com.alibaba.rocketmq.common.ThreadFactoryImpl; -import com.alibaba.rocketmq.common.UtilAll; -import com.alibaba.rocketmq.common.constant.LoggerName; -import com.alibaba.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerResponseHeader; -import com.alibaba.rocketmq.filtersrv.filter.FilterClassManager; -import com.alibaba.rocketmq.filtersrv.processor.DefaultRequestProcessor; -import com.alibaba.rocketmq.filtersrv.stats.FilterServerStatsManager; -import com.alibaba.rocketmq.remoting.RemotingServer; -import com.alibaba.rocketmq.remoting.netty.NettyRemotingServer; -import com.alibaba.rocketmq.remoting.netty.NettyServerConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - - -/** - * @author shijia.wxr - */ -public class FiltersrvController { - private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME); - - private final FiltersrvConfig filtersrvConfig; - - private final NettyServerConfig nettyServerConfig; - private final FilterClassManager filterClassManager; - - private final FilterServerOuterAPI filterServerOuterAPI = new FilterServerOuterAPI(); - private final DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer( - MixAll.FILTERSRV_CONSUMER_GROUP); - - private final ScheduledExecutorService scheduledExecutorService = Executors - .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FSScheduledThread")); - private final FilterServerStatsManager filterServerStatsManager = new FilterServerStatsManager(); - - private RemotingServer remotingServer; - - private ExecutorService remotingExecutor; - private volatile String brokerName = null; - - - public FiltersrvController(FiltersrvConfig filtersrvConfig, NettyServerConfig nettyServerConfig) { - this.filtersrvConfig = filtersrvConfig; - this.nettyServerConfig = nettyServerConfig; - this.filterClassManager = new FilterClassManager(this); - } - - - public boolean initialize() { - - MixAll.printObjectProperties(log, this.filtersrvConfig); - - - this.remotingServer = new NettyRemotingServer(this.nettyServerConfig); - - - this.remotingExecutor = - Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), - new ThreadFactoryImpl("RemotingExecutorThread_")); - - this.registerProcessor(); - - - this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { - - @Override - public void run() { - FiltersrvController.this.registerFilterServerToBroker(); - } - }, 3, 10, TimeUnit.SECONDS); - - this.defaultMQPullConsumer.setBrokerSuspendMaxTimeMillis(this.defaultMQPullConsumer - .getBrokerSuspendMaxTimeMillis() - 1000); - this.defaultMQPullConsumer.setConsumerTimeoutMillisWhenSuspend(this.defaultMQPullConsumer - .getConsumerTimeoutMillisWhenSuspend() - 1000); - - this.defaultMQPullConsumer.setNamesrvAddr(this.filtersrvConfig.getNamesrvAddr()); - this.defaultMQPullConsumer.setInstanceName(String.valueOf(UtilAll.getPid())); - - return true; - } - - private void registerProcessor() { - this.remotingServer - .registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor); - } - - public void registerFilterServerToBroker() { - try { - RegisterFilterServerResponseHeader responseHeader = - this.filterServerOuterAPI.registerFilterServerToBroker( - this.filtersrvConfig.getConnectWhichBroker(), this.localAddr()); - this.defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper() - .setDefaultBrokerId(responseHeader.getBrokerId()); - - if (null == this.brokerName) { - this.brokerName = responseHeader.getBrokerName(); - } - - log.info("register filter server<{}> to broker<{}> OK, Return: {} {}", - this.localAddr(), - this.filtersrvConfig.getConnectWhichBroker(), - responseHeader.getBrokerName(), - responseHeader.getBrokerId()); - } catch (Exception e) { - log.warn("register filter server Exception", e); - - log.warn("access broker failed, kill oneself"); - System.exit(-1); - } - } - - public String localAddr() { - return String.format("%s:%d", this.filtersrvConfig.getFilterServerIP(), - this.remotingServer.localListenPort()); - } - - public void start() throws Exception { - this.defaultMQPullConsumer.start(); - this.remotingServer.start(); - this.filterServerOuterAPI.start(); - this.defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper() - .setConnectBrokerByUser(true); - this.filterClassManager.start(); - this.filterServerStatsManager.start(); - } - - - public void shutdown() { - this.remotingServer.shutdown(); - this.remotingExecutor.shutdown(); - this.scheduledExecutorService.shutdown(); - this.defaultMQPullConsumer.shutdown(); - this.filterServerOuterAPI.shutdown(); - this.filterClassManager.shutdown(); - this.filterServerStatsManager.shutdown(); - } - - - public RemotingServer getRemotingServer() { - return remotingServer; - } - - - public void setRemotingServer(RemotingServer remotingServer) { - this.remotingServer = remotingServer; - } - - - public ExecutorService getRemotingExecutor() { - return remotingExecutor; - } - - - public void setRemotingExecutor(ExecutorService remotingExecutor) { - this.remotingExecutor = remotingExecutor; - } - - - public FiltersrvConfig getFiltersrvConfig() { - return filtersrvConfig; - } - - - public NettyServerConfig getNettyServerConfig() { - return nettyServerConfig; - } - - - public ScheduledExecutorService getScheduledExecutorService() { - return scheduledExecutorService; - } - - - public FilterServerOuterAPI getFilterServerOuterAPI() { - return filterServerOuterAPI; - } - - - public FilterClassManager getFilterClassManager() { - return filterClassManager; - } - - - public DefaultMQPullConsumer getDefaultMQPullConsumer() { - return defaultMQPullConsumer; - } - - - public String getBrokerName() { - return brokerName; - } - - - public void setBrokerName(String brokerName) { - this.brokerName = brokerName; - } - - - public FilterServerStatsManager getFilterServerStatsManager() { - return filterServerStatsManager; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/FiltersrvStartup.java ---------------------------------------------------------------------- diff --git a/filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/FiltersrvStartup.java b/filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/FiltersrvStartup.java deleted file mode 100644 index 3fe6b22..0000000 --- a/filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/FiltersrvStartup.java +++ /dev/null @@ -1,183 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.filtersrv; - -import ch.qos.logback.classic.LoggerContext; -import ch.qos.logback.classic.joran.JoranConfigurator; -import com.alibaba.rocketmq.common.MQVersion; -import com.alibaba.rocketmq.common.MixAll; -import com.alibaba.rocketmq.common.constant.LoggerName; -import com.alibaba.rocketmq.remoting.netty.NettyServerConfig; -import com.alibaba.rocketmq.remoting.netty.NettySystemConfig; -import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; -import com.alibaba.rocketmq.srvutil.ServerUtil; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.PosixParser; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.BufferedInputStream; -import java.io.FileInputStream; -import java.io.InputStream; -import java.util.Properties; -import java.util.concurrent.atomic.AtomicInteger; - - -/** - * @author shijia.wxr - */ -public class FiltersrvStartup { - public static Logger log; - - public static void main(String[] args) { - start(createController(args)); - } - - public static FiltersrvController start(FiltersrvController controller) { - - try { - controller.start(); - } catch (Exception e) { - e.printStackTrace(); - System.exit(-1); - } - - String tip = "The Filter Server boot success, " + controller.localAddr(); - log.info(tip); - System.out.printf("%s%n", tip); - - return controller; - } - - public static FiltersrvController createController(String[] args) { - System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); - - - if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) { - NettySystemConfig.socketSndbufSize = 65535; - } - - - if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) { - NettySystemConfig.socketRcvbufSize = 1024; - } - - try { - Options options = ServerUtil.buildCommandlineOptions(new Options()); - final CommandLine commandLine = - ServerUtil.parseCmdLine("mqfiltersrv", args, buildCommandlineOptions(options), - new PosixParser()); - if (null == commandLine) { - System.exit(-1); - return null; - } - - final FiltersrvConfig filtersrvConfig = new FiltersrvConfig(); - final NettyServerConfig nettyServerConfig = new NettyServerConfig(); - - if (commandLine.hasOption('c')) { - String file = commandLine.getOptionValue('c'); - if (file != null) { - InputStream in = new BufferedInputStream(new FileInputStream(file)); - Properties properties = new Properties(); - properties.load(in); - MixAll.properties2Object(properties, filtersrvConfig); - System.out.printf("load config properties file OK, " + file + "%n"); - in.close(); - - String port = properties.getProperty("listenPort"); - if (port != null) { - filtersrvConfig.setConnectWhichBroker(String.format("127.0.0.1:%s", port)); - } - } - } - - nettyServerConfig.setListenPort(0); - nettyServerConfig.setServerAsyncSemaphoreValue(filtersrvConfig.getFsServerAsyncSemaphoreValue()); - nettyServerConfig.setServerCallbackExecutorThreads(filtersrvConfig - .getFsServerCallbackExecutorThreads()); - nettyServerConfig.setServerWorkerThreads(filtersrvConfig.getFsServerWorkerThreads()); - - if (commandLine.hasOption('p')) { - MixAll.printObjectProperties(null, filtersrvConfig); - MixAll.printObjectProperties(null, nettyServerConfig); - System.exit(0); - } - - MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), filtersrvConfig); - if (null == filtersrvConfig.getRocketmqHome()) { - System.out.printf("Please set the " + MixAll.ROCKETMQ_HOME_ENV - + " variable in your environment to match the location of the RocketMQ installation%n"); - System.exit(-2); - } - - LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); - JoranConfigurator configurator = new JoranConfigurator(); - configurator.setContext(lc); - lc.reset(); - configurator.doConfigure(filtersrvConfig.getRocketmqHome() + "/conf/logback_filtersrv.xml"); - log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME); - - final FiltersrvController controller = - new FiltersrvController(filtersrvConfig, nettyServerConfig); - boolean initResult = controller.initialize(); - if (!initResult) { - controller.shutdown(); - System.exit(-3); - } - - Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { - private volatile boolean hasShutdown = false; - private AtomicInteger shutdownTimes = new AtomicInteger(0); - - @Override - public void run() { - synchronized (this) { - log.info("shutdown hook was invoked, " + this.shutdownTimes.incrementAndGet()); - if (!this.hasShutdown) { - this.hasShutdown = true; - long begineTime = System.currentTimeMillis(); - controller.shutdown(); - long consumingTimeTotal = System.currentTimeMillis() - begineTime; - log.info("shutdown hook over, consuming time total(ms): " + consumingTimeTotal); - } - } - } - }, "ShutdownHook")); - - return controller; - } catch (Throwable e) { - e.printStackTrace(); - System.exit(-1); - } - return null; - } - - public static Options buildCommandlineOptions(final Options options) { - Option opt = new Option("c", "configFile", true, "Filter server config properties file"); - opt.setRequired(false); - options.addOption(opt); - - opt = new Option("p", "printConfigItem", false, "Print all config item"); - opt.setRequired(false); - options.addOption(opt); - - return options; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/DynaCode.java ---------------------------------------------------------------------- diff --git a/filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/DynaCode.java b/filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/DynaCode.java deleted file mode 100644 index e17e5d2..0000000 --- a/filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/DynaCode.java +++ /dev/null @@ -1,393 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.filtersrv.filter; - -import com.alibaba.rocketmq.common.UtilAll; -import com.alibaba.rocketmq.common.constant.LoggerName; -import com.alibaba.rocketmq.common.filter.FilterAPI; -import com.alibaba.rocketmq.remoting.common.RemotingHelper; -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.tools.JavaCompiler; -import javax.tools.ToolProvider; -import java.io.*; -import java.net.MalformedURLException; -import java.net.URL; -import java.net.URLClassLoader; -import java.net.URLDecoder; -import java.util.*; - - -public class DynaCode { - private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME); - - private static final String FILE_SP = System.getProperty("file.separator"); - - private static final String LINE_SP = System.getProperty("line.separator"); - - private String sourcePath = System.getProperty("user.home") + FILE_SP + "rocketmq_filter_class" + FILE_SP - + UtilAll.getPid(); - - private String outPutClassPath = sourcePath; - - - private ClassLoader parentClassLoader; - - - private List<String> codeStrs; - - - private Map<String/* fullClassName */, Class<?>/* class */> loadClass; - - - private String classpath; - - - private String bootclasspath; - - - private String extdirs; - - - private String encoding = "UTF-8"; - - - private String target; - - - @SuppressWarnings("unchecked") - public DynaCode(String code) { - this(Thread.currentThread().getContextClassLoader(), Arrays.asList(code)); - } - - - public DynaCode(ClassLoader parentClassLoader, List<String> codeStrs) { - this(extractClasspath(parentClassLoader), parentClassLoader, codeStrs); - } - - - public DynaCode(String classpath, ClassLoader parentClassLoader, List<String> codeStrs) { - this.classpath = classpath; - this.parentClassLoader = parentClassLoader; - this.codeStrs = codeStrs; - this.loadClass = new HashMap<String, Class<?>>(codeStrs.size()); - } - - - private static String extractClasspath(ClassLoader cl) { - StringBuffer buf = new StringBuffer(); - while (cl != null) { - if (cl instanceof URLClassLoader) { - URL urls[] = ((URLClassLoader) cl).getURLs(); - for (int i = 0; i < urls.length; i++) { - if (buf.length() > 0) { - buf.append(File.pathSeparatorChar); - } - String s = urls[i].getFile(); - try { - s = URLDecoder.decode(s, "UTF-8"); - } catch (UnsupportedEncodingException e) { - continue; - } - File f = new File(s); - buf.append(f.getAbsolutePath()); - } - } - cl = cl.getParent(); - } - return buf.toString(); - } - - - public DynaCode(List<String> codeStrs) { - this(Thread.currentThread().getContextClassLoader(), codeStrs); - } - - public static Class<?> compileAndLoadClass(final String className, final String javaSource) - throws Exception { - String classSimpleName = FilterAPI.simpleClassName(className); - String javaCode = javaSource; - - final String newClassSimpleName = classSimpleName + System.currentTimeMillis(); - String newJavaCode = javaCode.replaceAll(classSimpleName, newClassSimpleName); - - List<String> codes = new ArrayList<String>(); - codes.add(newJavaCode); - DynaCode dc = new DynaCode(codes); - dc.compileAndLoadClass(); - Map<String, Class<?>> map = dc.getLoadClass(); - - Class<?> clazz = map.get(getQualifiedName(newJavaCode)); - return clazz; - } - - public void compileAndLoadClass() throws Exception { - String[] sourceFiles = this.uploadSrcFile(); - this.compile(sourceFiles); - this.loadClass(this.loadClass.keySet()); - } - - public Map<String, Class<?>> getLoadClass() { - return loadClass; - } - - public static String getQualifiedName(String code) { - StringBuilder sb = new StringBuilder(); - String className = getClassName(code); - if (StringUtils.isNotBlank(className)) { - - String packageName = getPackageName(code); - if (StringUtils.isNotBlank(packageName)) { - sb.append(packageName).append("."); - } - sb.append(className); - } - return sb.toString(); - } - - private String[] uploadSrcFile() throws Exception { - List<String> srcFileAbsolutePaths = new ArrayList<String>(codeStrs.size()); - for (String code : codeStrs) { - if (StringUtils.isNotBlank(code)) { - String packageName = getPackageName(code); - String className = getClassName(code); - if (StringUtils.isNotBlank(className)) { - File srcFile = null; - BufferedWriter bufferWriter = null; - try { - if (StringUtils.isBlank(packageName)) { - File pathFile = new File(sourcePath); - - if (!pathFile.exists()) { - if (!pathFile.mkdirs()) { - throw new RuntimeException("create PathFile Error!"); - } - } - srcFile = new File(sourcePath + FILE_SP + className + ".java"); - } else { - String srcPath = StringUtils.replace(packageName, ".", FILE_SP); - File pathFile = new File(sourcePath + FILE_SP + srcPath); - - if (!pathFile.exists()) { - if (!pathFile.mkdirs()) { - throw new RuntimeException("create PathFile Error!"); - } - } - srcFile = new File(pathFile.getAbsolutePath() + FILE_SP + className + ".java"); - } - synchronized (loadClass) { - loadClass.put(getFullClassName(code), null); - } - if (null != srcFile) { - LOGGER.warn("Dyna Create Java Source File:---->" + srcFile.getAbsolutePath()); - srcFileAbsolutePaths.add(srcFile.getAbsolutePath()); - srcFile.deleteOnExit(); - } - OutputStreamWriter outputStreamWriter = - new OutputStreamWriter(new FileOutputStream(srcFile), encoding); - bufferWriter = new BufferedWriter(outputStreamWriter); - for (String lineCode : code.split(LINE_SP)) { - bufferWriter.write(lineCode); - bufferWriter.newLine(); - } - bufferWriter.flush(); - } finally { - if (null != bufferWriter) { - bufferWriter.close(); - } - } - } - } - } - return srcFileAbsolutePaths.toArray(new String[srcFileAbsolutePaths.size()]); - } - - private void compile(String[] srcFiles) throws Exception { - String args[] = this.buildCompileJavacArgs(srcFiles); - ByteArrayOutputStream err = new ByteArrayOutputStream(); - JavaCompiler compiler = ToolProvider.getSystemJavaCompiler(); - if (compiler == null) { - throw new NullPointerException( - "ToolProvider.getSystemJavaCompiler() return null,please use JDK replace JRE!"); - } - int resultCode = compiler.run(null, null, err, args); - if (resultCode != 0) { - throw new Exception(err.toString(RemotingHelper.DEFAULT_CHARSET)); - } - } - - private void loadClass(Set<String> classFullNames) throws ClassNotFoundException, MalformedURLException { - synchronized (loadClass) { - ClassLoader classLoader = - new URLClassLoader(new URL[]{new File(outPutClassPath).toURI().toURL()}, - parentClassLoader); - for (String key : classFullNames) { - Class<?> classz = classLoader.loadClass(key); - if (null != classz) { - loadClass.put(key, classz); - LOGGER.info("Dyna Load Java Class File OK:----> className: " + key); - } else { - LOGGER.error("Dyna Load Java Class File Fail:----> className: " + key); - } - } - } - } - - public static String getClassName(String code) { - String className = StringUtils.substringBefore(code, "{"); - if (StringUtils.isBlank(className)) { - return className; - } - if (StringUtils.contains(code, " class ")) { - className = StringUtils.substringAfter(className, " class "); - if (StringUtils.contains(className, " extends ")) { - className = StringUtils.substringBefore(className, " extends ").trim(); - } else if (StringUtils.contains(className, " implements ")) { - className = StringUtils.trim(StringUtils.substringBefore(className, " implements ")); - } else { - className = StringUtils.trim(className); - } - } else if (StringUtils.contains(code, " interface ")) { - className = StringUtils.substringAfter(className, " interface "); - if (StringUtils.contains(className, " extends ")) { - className = StringUtils.substringBefore(className, " extends ").trim(); - } else { - className = StringUtils.trim(className); - } - } else if (StringUtils.contains(code, " enum ")) { - className = StringUtils.trim(StringUtils.substringAfter(className, " enum ")); - } else { - return StringUtils.EMPTY; - } - return className; - } - - public static String getPackageName(String code) { - String packageName = - StringUtils.substringBefore(StringUtils.substringAfter(code, "package "), ";").trim(); - return packageName; - } - - public static String getFullClassName(String code) { - String packageName = getPackageName(code); - String className = getClassName(code); - return StringUtils.isBlank(packageName) ? className : packageName + "." + className; - } - - private String[] buildCompileJavacArgs(String srcFiles[]) { - ArrayList<String> args = new ArrayList<String>(); - if (StringUtils.isNotBlank(classpath)) { - args.add("-classpath"); - args.add(classpath); - } - if (StringUtils.isNotBlank(outPutClassPath)) { - args.add("-d"); - args.add(outPutClassPath); - } - if (StringUtils.isNotBlank(sourcePath)) { - args.add("-sourcepath"); - args.add(sourcePath); - } - if (StringUtils.isNotBlank(bootclasspath)) { - args.add("-bootclasspath"); - args.add(bootclasspath); - } - if (StringUtils.isNotBlank(extdirs)) { - args.add("-extdirs"); - args.add(extdirs); - } - if (StringUtils.isNotBlank(encoding)) { - args.add("-encoding"); - args.add(encoding); - } - if (StringUtils.isNotBlank(target)) { - args.add("-target"); - args.add(target); - } - for (int i = 0; i < srcFiles.length; i++) { - args.add(srcFiles[i]); - } - return args.toArray(new String[args.size()]); - } - - public String getOutPutClassPath() { - return outPutClassPath; - } - - public void setOutPutClassPath(String outPutClassPath) { - this.outPutClassPath = outPutClassPath; - } - - public String getSourcePath() { - return sourcePath; - } - - public void setSourcePath(String sourcePath) { - this.sourcePath = sourcePath; - } - - public ClassLoader getParentClassLoader() { - return parentClassLoader; - } - - public void setParentClassLoader(ClassLoader parentClassLoader) { - this.parentClassLoader = parentClassLoader; - } - - public String getClasspath() { - return classpath; - } - - public void setClasspath(String classpath) { - this.classpath = classpath; - } - - public String getBootclasspath() { - return bootclasspath; - } - - public void setBootclasspath(String bootclasspath) { - this.bootclasspath = bootclasspath; - } - - public String getExtdirs() { - return extdirs; - } - - public void setExtdirs(String extdirs) { - this.extdirs = extdirs; - } - - public String getEncoding() { - return encoding; - } - - public void setEncoding(String encoding) { - this.encoding = encoding; - } - - public String getTarget() { - return target; - } - - public void setTarget(String target) { - this.target = target; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/FilterClassFetchMethod.java ---------------------------------------------------------------------- diff --git a/filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/FilterClassFetchMethod.java b/filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/FilterClassFetchMethod.java deleted file mode 100644 index 27b19ce..0000000 --- a/filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/FilterClassFetchMethod.java +++ /dev/null @@ -1,22 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.filtersrv.filter; - -public interface FilterClassFetchMethod { - public String fetch(final String topic, final String consumerGroup, final String className); -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/FilterClassInfo.java ---------------------------------------------------------------------- diff --git a/filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/FilterClassInfo.java b/filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/FilterClassInfo.java deleted file mode 100644 index f3e747e..0000000 --- a/filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/FilterClassInfo.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.filtersrv.filter; - -import com.alibaba.rocketmq.common.filter.MessageFilter; - - -public class FilterClassInfo { - private String className; - private int classCRC; - private MessageFilter messageFilter; - - - public int getClassCRC() { - return classCRC; - } - - - public void setClassCRC(int classCRC) { - this.classCRC = classCRC; - } - - - public MessageFilter getMessageFilter() { - return messageFilter; - } - - - public void setMessageFilter(MessageFilter messageFilter) { - this.messageFilter = messageFilter; - } - - - public String getClassName() { - return className; - } - - - public void setClassName(String className) { - this.className = className; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/FilterClassLoader.java ---------------------------------------------------------------------- diff --git a/filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/FilterClassLoader.java b/filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/FilterClassLoader.java deleted file mode 100644 index 8966ca2..0000000 --- a/filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/FilterClassLoader.java +++ /dev/null @@ -1,24 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.filtersrv.filter; - -public class FilterClassLoader extends ClassLoader { - public final Class<?> createNewClass(String name, byte[] b, int off, int len) throws ClassFormatError { - return this.defineClass(name, b, off, len); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/FilterClassManager.java ---------------------------------------------------------------------- diff --git a/filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/FilterClassManager.java b/filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/FilterClassManager.java deleted file mode 100644 index 618db8e..0000000 --- a/filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/FilterClassManager.java +++ /dev/null @@ -1,175 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.filtersrv.filter; - -import com.alibaba.rocketmq.common.MixAll; -import com.alibaba.rocketmq.common.ThreadFactoryImpl; -import com.alibaba.rocketmq.common.UtilAll; -import com.alibaba.rocketmq.common.constant.LoggerName; -import com.alibaba.rocketmq.common.filter.MessageFilter; -import com.alibaba.rocketmq.filtersrv.FiltersrvController; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Iterator; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - - -public class FilterClassManager { - private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME); - - private final Object compileLock = new Object(); - private final FiltersrvController filtersrvController; - - private final ScheduledExecutorService scheduledExecutorService = Executors - .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FSGetClassScheduledThread")); - private ConcurrentHashMap<String/* topic@consumerGroup */, FilterClassInfo> filterClassTable = - new ConcurrentHashMap<String, FilterClassInfo>(128); - private FilterClassFetchMethod filterClassFetchMethod; - - - public FilterClassManager(FiltersrvController filtersrvController) { - this.filtersrvController = filtersrvController; - this.filterClassFetchMethod = - new HttpFilterClassFetchMethod(this.filtersrvController.getFiltersrvConfig() - .getFilterClassRepertoryUrl()); - } - - - public void start() { - if (!this.filtersrvController.getFiltersrvConfig().isClientUploadFilterClassEnable()) { - this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { - - @Override - public void run() { - fetchClassFromRemoteHost(); - } - }, 1, 1, TimeUnit.MINUTES); - } - } - - private void fetchClassFromRemoteHost() { - Iterator<Entry<String, FilterClassInfo>> it = this.filterClassTable.entrySet().iterator(); - while (it.hasNext()) { - try { - Entry<String, FilterClassInfo> next = it.next(); - FilterClassInfo filterClassInfo = next.getValue(); - String[] topicAndGroup = next.getKey().split("@"); - String responseStr = - this.filterClassFetchMethod.fetch(topicAndGroup[0], topicAndGroup[1], - filterClassInfo.getClassName()); - byte[] filterSourceBinary = responseStr.getBytes("UTF-8"); - int classCRC = UtilAll.crc32(responseStr.getBytes("UTF-8")); - if (classCRC != filterClassInfo.getClassCRC()) { - String javaSource = new String(filterSourceBinary, MixAll.DEFAULT_CHARSET); - Class<?> newClass = - DynaCode.compileAndLoadClass(filterClassInfo.getClassName(), javaSource); - Object newInstance = newClass.newInstance(); - filterClassInfo.setMessageFilter((MessageFilter) newInstance); - filterClassInfo.setClassCRC(classCRC); - - log.info("fetch Remote class File OK, {} {}", next.getKey(), - filterClassInfo.getClassName()); - } - } catch (Exception e) { - log.error("fetchClassFromRemoteHost Exception", e); - } - } - } - - public void shutdown() { - this.scheduledExecutorService.shutdown(); - } - - public boolean registerFilterClass(final String consumerGroup, final String topic, - final String className, final int classCRC, final byte[] filterSourceBinary) { - final String key = buildKey(consumerGroup, topic); - - - boolean registerNew = false; - FilterClassInfo filterClassInfoPrev = this.filterClassTable.get(key); - if (null == filterClassInfoPrev) { - registerNew = true; - } else { - if (this.filtersrvController.getFiltersrvConfig().isClientUploadFilterClassEnable()) { - if (filterClassInfoPrev.getClassCRC() != classCRC && classCRC != 0) { - registerNew = true; - } - } - } - - if (registerNew) { - synchronized (this.compileLock) { - filterClassInfoPrev = this.filterClassTable.get(key); - if (null != filterClassInfoPrev && filterClassInfoPrev.getClassCRC() == classCRC) { - return true; - } - - try { - - FilterClassInfo filterClassInfoNew = new FilterClassInfo(); - filterClassInfoNew.setClassName(className); - filterClassInfoNew.setClassCRC(0); - filterClassInfoNew.setMessageFilter(null); - - if (this.filtersrvController.getFiltersrvConfig().isClientUploadFilterClassEnable()) { - String javaSource = new String(filterSourceBinary, MixAll.DEFAULT_CHARSET); - Class<?> newClass = DynaCode.compileAndLoadClass(className, javaSource); - Object newInstance = newClass.newInstance(); - filterClassInfoNew.setMessageFilter((MessageFilter) newInstance); - filterClassInfoNew.setClassCRC(classCRC); - } - - this.filterClassTable.put(key, filterClassInfoNew); - } catch (Throwable e) { - String info = - String - .format( - "FilterServer, registerFilterClass Exception, consumerGroup: %s topic: %s className: %s", - consumerGroup, topic, className); - log.error(info, e); - return false; - } - } - } - - return true; - } - - private static String buildKey(final String consumerGroup, final String topic) { - return topic + "@" + consumerGroup; - } - - public FilterClassInfo findFilterClass(final String consumerGroup, final String topic) { - return this.filterClassTable.get(buildKey(consumerGroup, topic)); - } - - - public FilterClassFetchMethod getFilterClassFetchMethod() { - return filterClassFetchMethod; - } - - - public void setFilterClassFetchMethod(FilterClassFetchMethod filterClassFetchMethod) { - this.filterClassFetchMethod = filterClassFetchMethod; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/HttpFilterClassFetchMethod.java ---------------------------------------------------------------------- diff --git a/filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/HttpFilterClassFetchMethod.java b/filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/HttpFilterClassFetchMethod.java deleted file mode 100644 index 88cb572..0000000 --- a/filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/HttpFilterClassFetchMethod.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.filtersrv.filter; - -import com.alibaba.rocketmq.common.constant.LoggerName; -import com.alibaba.rocketmq.common.utils.HttpTinyClient; -import com.alibaba.rocketmq.common.utils.HttpTinyClient.HttpResult; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -public class HttpFilterClassFetchMethod implements FilterClassFetchMethod { - private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME); - private final String url; - - - public HttpFilterClassFetchMethod(String url) { - this.url = url; - } - - - @Override - public String fetch(String topic, String consumerGroup, String className) { - String thisUrl = String.format("%s/%s.java", this.url, className); - - try { - HttpResult result = HttpTinyClient.httpGet(thisUrl, null, null, "UTF-8", 5000); - if (200 == result.code) { - return result.content; - } - } catch (Exception e) { - log.error( - String.format("call <%s> exception, Topic: %s Group: %s", thisUrl, topic, consumerGroup), e); - } - - return null; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/processor/DefaultRequestProcessor.java ---------------------------------------------------------------------- diff --git a/filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/processor/DefaultRequestProcessor.java b/filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/processor/DefaultRequestProcessor.java deleted file mode 100644 index 105cfff..0000000 --- a/filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/processor/DefaultRequestProcessor.java +++ /dev/null @@ -1,355 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.filtersrv.processor; - -import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer; -import com.alibaba.rocketmq.client.consumer.PullCallback; -import com.alibaba.rocketmq.client.consumer.PullResult; -import com.alibaba.rocketmq.common.MixAll; -import com.alibaba.rocketmq.common.UtilAll; -import com.alibaba.rocketmq.common.constant.LoggerName; -import com.alibaba.rocketmq.common.filter.FilterContext; -import com.alibaba.rocketmq.common.message.MessageDecoder; -import com.alibaba.rocketmq.common.message.MessageExt; -import com.alibaba.rocketmq.common.message.MessageQueue; -import com.alibaba.rocketmq.common.protocol.RequestCode; -import com.alibaba.rocketmq.common.protocol.ResponseCode; -import com.alibaba.rocketmq.common.protocol.header.PullMessageRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.PullMessageResponseHeader; -import com.alibaba.rocketmq.common.protocol.header.filtersrv.RegisterMessageFilterClassRequestHeader; -import com.alibaba.rocketmq.common.sysflag.MessageSysFlag; -import com.alibaba.rocketmq.filtersrv.FiltersrvController; -import com.alibaba.rocketmq.filtersrv.filter.FilterClassInfo; -import com.alibaba.rocketmq.remoting.common.RemotingHelper; -import com.alibaba.rocketmq.remoting.exception.RemotingCommandException; -import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor; -import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; -import com.alibaba.rocketmq.store.CommitLog; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelHandlerContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; - - -/** - * @author shijia.wxr - */ -public class DefaultRequestProcessor implements NettyRequestProcessor { - private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME); - - private final FiltersrvController filtersrvController; - - - public DefaultRequestProcessor(FiltersrvController filtersrvController) { - this.filtersrvController = filtersrvController; - } - - - @Override - public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception { - if (log.isDebugEnabled()) { - log.debug("receive request, {} {} {}", - request.getCode(), - RemotingHelper.parseChannelRemoteAddr(ctx.channel()), - request); - } - - switch (request.getCode()) { - case RequestCode.REGISTER_MESSAGE_FILTER_CLASS: - return registerMessageFilterClass(ctx, request); - case RequestCode.PULL_MESSAGE: - return pullMessageForward(ctx, request); - } - - return null; - } - - @Override - public boolean rejectRequest() { - return false; - } - - private RemotingCommand registerMessageFilterClass(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { - final RemotingCommand response = RemotingCommand.createResponseCommand(null); - final RegisterMessageFilterClassRequestHeader requestHeader = - (RegisterMessageFilterClassRequestHeader) request.decodeCommandCustomHeader(RegisterMessageFilterClassRequestHeader.class); - - try { - boolean ok = this.filtersrvController.getFilterClassManager().registerFilterClass(requestHeader.getConsumerGroup(), - requestHeader.getTopic(), - requestHeader.getClassName(), - requestHeader.getClassCRC(), - request.getBody()); - if (!ok) { - throw new Exception("registerFilterClass error"); - } - } catch (Exception e) { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark(RemotingHelper.exceptionSimpleDesc(e)); - return response; - } - - response.setCode(ResponseCode.SUCCESS); - response.setRemark(null); - return response; - } - - private RemotingCommand pullMessageForward(final ChannelHandlerContext ctx, final RemotingCommand request) throws Exception { - final RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class); - final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader(); - final PullMessageRequestHeader requestHeader = - (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class); - - final FilterContext filterContext = new FilterContext(); - filterContext.setConsumerGroup(requestHeader.getConsumerGroup()); - - - response.setOpaque(request.getOpaque()); - - DefaultMQPullConsumer pullConsumer = this.filtersrvController.getDefaultMQPullConsumer(); - final FilterClassInfo findFilterClass = - this.filtersrvController.getFilterClassManager() - .findFilterClass(requestHeader.getConsumerGroup(), requestHeader.getTopic()); - if (null == findFilterClass) { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("Find Filter class failed, not registered"); - return response; - } - - if (null == findFilterClass.getMessageFilter()) { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("Find Filter class failed, registered but no class"); - return response; - } - - responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID); - - - MessageQueue mq = new MessageQueue(); - mq.setTopic(requestHeader.getTopic()); - mq.setQueueId(requestHeader.getQueueId()); - mq.setBrokerName(this.filtersrvController.getBrokerName()); - long offset = requestHeader.getQueueOffset(); - int maxNums = requestHeader.getMaxMsgNums(); - - final PullCallback pullCallback = new PullCallback() { - - @Override - public void onSuccess(PullResult pullResult) { - responseHeader.setMaxOffset(pullResult.getMaxOffset()); - responseHeader.setMinOffset(pullResult.getMinOffset()); - responseHeader.setNextBeginOffset(pullResult.getNextBeginOffset()); - response.setRemark(null); - - switch (pullResult.getPullStatus()) { - case FOUND: - response.setCode(ResponseCode.SUCCESS); - - List<MessageExt> msgListOK = new ArrayList<MessageExt>(); - try { - for (MessageExt msg : pullResult.getMsgFoundList()) { - boolean match = findFilterClass.getMessageFilter().match(msg, filterContext); - if (match) { - msgListOK.add(msg); - } - } - - - if (!msgListOK.isEmpty()) { - returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, msgListOK); - return; - } else { - response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); - } - } catch (Throwable e) { - final String error = - String.format("do Message Filter Exception, ConsumerGroup: %s Topic: %s ", - requestHeader.getConsumerGroup(), requestHeader.getTopic()); - log.error(error, e); - - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark(error + RemotingHelper.exceptionSimpleDesc(e)); - returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null); - return; - } - - break; - case NO_MATCHED_MSG: - response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); - break; - case NO_NEW_MSG: - response.setCode(ResponseCode.PULL_NOT_FOUND); - break; - case OFFSET_ILLEGAL: - response.setCode(ResponseCode.PULL_OFFSET_MOVED); - break; - default: - break; - } - - returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null); - } - - - @Override - public void onException(Throwable e) { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("Pull Callback Exception, " + RemotingHelper.exceptionSimpleDesc(e)); - returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null); - return; - } - }; - - pullConsumer.pullBlockIfNotFound(mq, null, offset, maxNums, pullCallback); - - return null; - } - - private void returnResponse(final String group, final String topic, ChannelHandlerContext ctx, final RemotingCommand response, - final List<MessageExt> msgList) { - if (null != msgList) { - ByteBuffer[] msgBufferList = new ByteBuffer[msgList.size()]; - int bodyTotalSize = 0; - for (int i = 0; i < msgList.size(); i++) { - try { - msgBufferList[i] = messageToByteBuffer(msgList.get(i)); - bodyTotalSize += msgBufferList[i].capacity(); - } catch (Exception e) { - log.error("messageToByteBuffer UnsupportedEncodingException", e); - } - } - - ByteBuffer body = ByteBuffer.allocate(bodyTotalSize); - for (ByteBuffer bb : msgBufferList) { - bb.flip(); - body.put(bb); - } - - response.setBody(body.array()); - - - this.filtersrvController.getFilterServerStatsManager().incGroupGetNums(group, topic, msgList.size()); - - this.filtersrvController.getFilterServerStatsManager().incGroupGetSize(group, topic, bodyTotalSize); - } - - try { - ctx.writeAndFlush(response).addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - log.error("FilterServer response to " + future.channel().remoteAddress() + " failed", future.cause()); - log.error(response.toString()); - } - } - }); - } catch (Throwable e) { - log.error("FilterServer process request over, but response failed", e); - log.error(response.toString()); - } - } - - private ByteBuffer messageToByteBuffer(final MessageExt msg) throws IOException { - int sysFlag = MessageSysFlag.clearCompressedFlag(msg.getSysFlag()); - if (msg.getBody() != null) { - if (msg.getBody().length >= this.filtersrvController.getFiltersrvConfig().getCompressMsgBodyOverHowmuch()) { - byte[] data = UtilAll.compress(msg.getBody(), this.filtersrvController.getFiltersrvConfig().getZipCompressLevel()); - if (data != null) { - msg.setBody(data); - sysFlag |= MessageSysFlag.COMPRESSED_FLAG; - } - } - } - - final int bodyLength = msg.getBody() != null ? msg.getBody().length : 0; - byte[] topicData = msg.getTopic().getBytes(MixAll.DEFAULT_CHARSET); - final int topicLength = topicData.length; - String properties = MessageDecoder.messageProperties2String(msg.getProperties()); - byte[] propertiesData = properties.getBytes(MixAll.DEFAULT_CHARSET); - final int propertiesLength = propertiesData.length; - final int msgLen = 4 // 1 TOTALSIZE - + 4 // 2 MAGICCODE - + 4 // 3 BODYCRC - + 4 // 4 QUEUEID - + 4 // 5 FLAG - + 8 // 6 QUEUEOFFSET - + 8 // 7 PHYSICALOFFSET - + 4 // 8 SYSFLAG - + 8 // 9 BORNTIMESTAMP - + 8 // 10 BORNHOST - + 8 // 11 STORETIMESTAMP - + 8 // 12 STOREHOSTADDRESS - + 4 // 13 RECONSUMETIMES - + 8 // 14 Prepared Transaction Offset - + 4 + bodyLength // 14 BODY - + 1 + topicLength // 15 TOPIC - + 2 + propertiesLength // 16 propertiesLength - + 0; - - ByteBuffer msgStoreItemMemory = ByteBuffer.allocate(msgLen); - - final MessageExt msgInner = msg; - - // 1 TOTALSIZE - msgStoreItemMemory.putInt(msgLen); - // 2 MAGICCODE - msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE); - // 3 BODYCRC - msgStoreItemMemory.putInt(UtilAll.crc32(msgInner.getBody())); - // 4 QUEUEID - msgStoreItemMemory.putInt(msgInner.getQueueId()); - // 5 FLAG - msgStoreItemMemory.putInt(msgInner.getFlag()); - // 6 QUEUEOFFSET - msgStoreItemMemory.putLong(msgInner.getQueueOffset()); - // 7 PHYSICALOFFSET - msgStoreItemMemory.putLong(msgInner.getCommitLogOffset()); - // 8 SYSFLAG - msgStoreItemMemory.putInt(sysFlag); - // 9 BORNTIMESTAMP - msgStoreItemMemory.putLong(msgInner.getBornTimestamp()); - // 10 BORNHOST - msgStoreItemMemory.put(msgInner.getBornHostBytes()); - // 11 STORETIMESTAMP - msgStoreItemMemory.putLong(msgInner.getStoreTimestamp()); - // 12 STOREHOSTADDRESS - msgStoreItemMemory.put(msgInner.getStoreHostBytes()); - // 13 RECONSUMETIMES - msgStoreItemMemory.putInt(msgInner.getReconsumeTimes()); - // 14 Prepared Transaction Offset - msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset()); - // 15 BODY - msgStoreItemMemory.putInt(bodyLength); - if (bodyLength > 0) - msgStoreItemMemory.put(msgInner.getBody()); - // 16 TOPIC - msgStoreItemMemory.put((byte) topicLength); - msgStoreItemMemory.put(topicData); - // 17 PROPERTIES - msgStoreItemMemory.putShort((short) propertiesLength); - if (propertiesLength > 0) - msgStoreItemMemory.put(propertiesData); - - return msgStoreItemMemory; - } -}
