http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/PullConsumerTest.java ---------------------------------------------------------------------- diff --git a/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/PullConsumerTest.java b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/PullConsumerTest.java new file mode 100644 index 0000000..25d668c --- /dev/null +++ b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/PullConsumerTest.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.example.simple; + +import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer; +import com.alibaba.rocketmq.client.consumer.PullResult; +import com.alibaba.rocketmq.client.exception.MQClientException; +import com.alibaba.rocketmq.common.message.MessageQueue; + +public class PullConsumerTest { + public static void main(String[] args) throws MQClientException { + DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5"); + consumer.start(); + + try { + MessageQueue mq = new MessageQueue(); + mq.setQueueId(0); + mq.setTopic("TopicTest3"); + mq.setBrokerName("vivedeMacBook-Pro.local"); + + long offset = 26; + + long beginTime = System.currentTimeMillis(); + PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, offset, 32); + System.out.printf("%s%n", System.currentTimeMillis() - beginTime); + System.out.printf("%s%n", pullResult); + } catch (Exception e) { + e.printStackTrace(); + } + + consumer.shutdown(); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/PullScheduleService.java ---------------------------------------------------------------------- diff --git a/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/PullScheduleService.java b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/PullScheduleService.java new file mode 100644 index 0000000..0c86cf8 --- /dev/null +++ b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/PullScheduleService.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.example.simple; + +import com.alibaba.rocketmq.client.consumer.MQPullConsumer; +import com.alibaba.rocketmq.client.consumer.MQPullConsumerScheduleService; +import com.alibaba.rocketmq.client.consumer.PullResult; +import com.alibaba.rocketmq.client.consumer.PullTaskCallback; +import com.alibaba.rocketmq.client.consumer.PullTaskContext; +import com.alibaba.rocketmq.client.exception.MQClientException; +import com.alibaba.rocketmq.common.message.MessageQueue; +import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; + + +public class PullScheduleService { + + public static void main(String[] args) throws MQClientException { + final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService("GroupName1"); + + scheduleService.setMessageModel(MessageModel.CLUSTERING); + scheduleService.registerPullTaskCallback("TopicTest1", new PullTaskCallback() { + + @Override + public void doPullTask(MessageQueue mq, PullTaskContext context) { + MQPullConsumer consumer = context.getPullConsumer(); + try { + + long offset = consumer.fetchConsumeOffset(mq, false); + if (offset < 0) + offset = 0; + + PullResult pullResult = consumer.pull(mq, "*", offset, 32); + System.out.printf("%s%n", offset + "\t" + mq + "\t" + pullResult); + switch (pullResult.getPullStatus()) { + case FOUND: + break; + case NO_MATCHED_MSG: + break; + case NO_NEW_MSG: + case OFFSET_ILLEGAL: + break; + default: + break; + } + consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset()); + + + context.setPullNextDelayTimeMillis(100); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + + scheduleService.start(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/PushConsumer.java ---------------------------------------------------------------------- diff --git a/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/PushConsumer.java b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/PushConsumer.java new file mode 100644 index 0000000..5628ced --- /dev/null +++ b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/PushConsumer.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.example.simple; + +import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; +import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import com.alibaba.rocketmq.client.exception.MQClientException; +import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; +import com.alibaba.rocketmq.common.message.MessageExt; + +import java.util.List; + + +public class PushConsumer { + + public static void main(String[] args) throws InterruptedException, MQClientException { + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1"); + consumer.subscribe("Jodie_topic_1023", "*"); + consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); + consumer.registerMessageListener(new MessageListenerConcurrently() { + + /** + + */ + @Override + public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { + System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n"); + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + }); + consumer.start(); + System.out.printf("Consumer Started.%n"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/RandomAsyncCommit.java ---------------------------------------------------------------------- diff --git a/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/RandomAsyncCommit.java b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/RandomAsyncCommit.java new file mode 100644 index 0000000..fc6bacd --- /dev/null +++ b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/RandomAsyncCommit.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.example.simple; + +import com.alibaba.rocketmq.common.message.MessageExt; +import com.alibaba.rocketmq.common.message.MessageQueue; + +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + + +public class RandomAsyncCommit { + private final ConcurrentHashMap<MessageQueue, CachedQueue> mqCachedTable = + new ConcurrentHashMap<MessageQueue, CachedQueue>(); + + + public void putMessages(final MessageQueue mq, final List<MessageExt> msgs) { + CachedQueue cachedQueue = this.mqCachedTable.get(mq); + if (null == cachedQueue) { + cachedQueue = new CachedQueue(); + this.mqCachedTable.put(mq, cachedQueue); + } + for (MessageExt msg : msgs) { + cachedQueue.getMsgCachedTable().put(msg.getQueueOffset(), msg); + } + } + + + public void removeMessage(final MessageQueue mq, long offset) { + CachedQueue cachedQueue = this.mqCachedTable.get(mq); + if (null != cachedQueue) { + cachedQueue.getMsgCachedTable().remove(offset); + } + } + + + public long commitableOffset(final MessageQueue mq) { + CachedQueue cachedQueue = this.mqCachedTable.get(mq); + if (null != cachedQueue) { + return cachedQueue.getMsgCachedTable().firstKey(); + } + + return -1; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/TestProducer.java ---------------------------------------------------------------------- diff --git a/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/TestProducer.java b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/TestProducer.java new file mode 100644 index 0000000..68347a6 --- /dev/null +++ b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/TestProducer.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.example.simple; + +import com.alibaba.rocketmq.client.QueryResult; +import com.alibaba.rocketmq.client.exception.MQClientException; +import com.alibaba.rocketmq.client.producer.DefaultMQProducer; +import com.alibaba.rocketmq.client.producer.SendResult; +import com.alibaba.rocketmq.common.message.Message; +import com.alibaba.rocketmq.common.message.MessageExt; +import com.alibaba.rocketmq.remoting.common.RemotingHelper; + +public class TestProducer { + public static void main(String[] args) throws MQClientException, InterruptedException { + DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); + producer.start(); + + for (int i = 0; i < 1; i++) + try { + { + Message msg = new Message("TopicTest1", + "TagA", + "key113", + "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); + SendResult sendResult = producer.send(msg); + System.out.printf("%s%n", sendResult); + + QueryResult queryMessage = + producer.queryMessage("TopicTest1", "key113", 10, 0, System.currentTimeMillis()); + for (MessageExt m : queryMessage.getMessageList()) { + System.out.printf("%s%n", m); + } + } + + } catch (Exception e) { + e.printStackTrace(); + } + producer.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/transaction/TransactionCheckListenerImpl.java ---------------------------------------------------------------------- diff --git a/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/transaction/TransactionCheckListenerImpl.java b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/transaction/TransactionCheckListenerImpl.java new file mode 100644 index 0000000..2e91e34 --- /dev/null +++ b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/transaction/TransactionCheckListenerImpl.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.example.transaction; + +import com.alibaba.rocketmq.client.producer.LocalTransactionState; +import com.alibaba.rocketmq.client.producer.TransactionCheckListener; +import com.alibaba.rocketmq.common.message.MessageExt; + +import java.util.concurrent.atomic.AtomicInteger; + + +public class TransactionCheckListenerImpl implements TransactionCheckListener { + private AtomicInteger transactionIndex = new AtomicInteger(0); + + + @Override + public LocalTransactionState checkLocalTransactionState(MessageExt msg) { + System.out.printf("server checking TrMsg " + msg.toString() + "%n"); + + int value = transactionIndex.getAndIncrement(); + if ((value % 6) == 0) { + throw new RuntimeException("Could not find db"); + } else if ((value % 5) == 0) { + return LocalTransactionState.ROLLBACK_MESSAGE; + } else if ((value % 4) == 0) { + return LocalTransactionState.COMMIT_MESSAGE; + } + + return LocalTransactionState.UNKNOW; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/transaction/TransactionExecuterImpl.java ---------------------------------------------------------------------- diff --git a/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/transaction/TransactionExecuterImpl.java b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/transaction/TransactionExecuterImpl.java new file mode 100644 index 0000000..cda523a --- /dev/null +++ b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/transaction/TransactionExecuterImpl.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.example.transaction; + +import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter; +import com.alibaba.rocketmq.client.producer.LocalTransactionState; +import com.alibaba.rocketmq.common.message.Message; + +import java.util.concurrent.atomic.AtomicInteger; + +public class TransactionExecuterImpl implements LocalTransactionExecuter { + private AtomicInteger transactionIndex = new AtomicInteger(1); + + + @Override + public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) { + int value = transactionIndex.getAndIncrement(); + + if (value == 0) { + throw new RuntimeException("Could not find db"); + } else if ((value % 5) == 0) { + return LocalTransactionState.ROLLBACK_MESSAGE; + } else if ((value % 4) == 0) { + return LocalTransactionState.COMMIT_MESSAGE; + } + + return LocalTransactionState.UNKNOW; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/transaction/TransactionProducer.java ---------------------------------------------------------------------- diff --git a/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/transaction/TransactionProducer.java b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/transaction/TransactionProducer.java new file mode 100644 index 0000000..2c4745f --- /dev/null +++ b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/transaction/TransactionProducer.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.example.transaction; + +import com.alibaba.rocketmq.client.exception.MQClientException; +import com.alibaba.rocketmq.client.producer.SendResult; +import com.alibaba.rocketmq.client.producer.TransactionCheckListener; +import com.alibaba.rocketmq.client.producer.TransactionMQProducer; +import com.alibaba.rocketmq.common.message.Message; +import com.alibaba.rocketmq.remoting.common.RemotingHelper; + +import java.io.UnsupportedEncodingException; + +public class TransactionProducer { + public static void main(String[] args) throws MQClientException, InterruptedException { + TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl(); + TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name"); + producer.setCheckThreadPoolMinSize(2); + producer.setCheckThreadPoolMaxSize(2); + producer.setCheckRequestHoldMax(2000); + producer.setTransactionCheckListener(transactionCheckListener); + producer.start(); + + String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"}; + TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl(); + for (int i = 0; i < 100; i++) { + try { + Message msg = + new Message("TopicTest", tags[i % tags.length], "KEY" + i, + ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); + SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null); + System.out.printf("%s%n", sendResult); + + Thread.sleep(10); + } catch (MQClientException e) { + e.printStackTrace(); + } catch (UnsupportedEncodingException e) { + e.printStackTrace(); + } + } + + for (int i = 0; i < 100000; i++) { + Thread.sleep(1000); + } + producer.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-example/src/main/resources/MessageFilterImpl.java ---------------------------------------------------------------------- diff --git a/rocketmq-example/src/main/resources/MessageFilterImpl.java b/rocketmq-example/src/main/resources/MessageFilterImpl.java new file mode 100644 index 0000000..ea6f3d8 --- /dev/null +++ b/rocketmq-example/src/main/resources/MessageFilterImpl.java @@ -0,0 +1,22 @@ +package com.alibaba.rocketmq.example.filter; + +import com.alibaba.rocketmq.common.filter.MessageFilter; +import com.alibaba.rocketmq.common.message.MessageExt; + + +public class MessageFilterImpl implements MessageFilter { + + @Override + public boolean match(MessageExt msg) { + String property = msg.getProperty("SequenceId"); + if (property != null) { + int id = Integer.parseInt(property); + if (((id % 10) == 0) && // + (id > 100)) { + return true; + } + } + + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-filtersrv/pom.xml ---------------------------------------------------------------------- diff --git a/rocketmq-filtersrv/pom.xml b/rocketmq-filtersrv/pom.xml new file mode 100644 index 0000000..28c360b --- /dev/null +++ b/rocketmq-filtersrv/pom.xml @@ -0,0 +1,62 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain producerGroup copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <groupId>com.alibaba.rocketmq</groupId> + <artifactId>rocketmq-all</artifactId> + <version>4.0.0-SNAPSHOT</version> + </parent> + + <modelVersion>4.0.0</modelVersion> + <packaging>jar</packaging> + <artifactId>rocketmq-filtersrv</artifactId> + <name>rocketmq-filtersrv ${project.version}</name> + + <dependencies> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>rocketmq-client</artifactId> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>rocketmq-store</artifactId> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>rocketmq-srvutil</artifactId> + </dependency> + <dependency> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + </dependency> + <dependency> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-core</artifactId> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/FilterServerOuterAPI.java ---------------------------------------------------------------------- diff --git a/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/FilterServerOuterAPI.java b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/FilterServerOuterAPI.java new file mode 100644 index 0000000..b469b3f --- /dev/null +++ b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/FilterServerOuterAPI.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.filtersrv; + +import com.alibaba.rocketmq.client.exception.MQBrokerException; +import com.alibaba.rocketmq.common.protocol.RequestCode; +import com.alibaba.rocketmq.common.protocol.ResponseCode; +import com.alibaba.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerRequestHeader; +import com.alibaba.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerResponseHeader; +import com.alibaba.rocketmq.remoting.RemotingClient; +import com.alibaba.rocketmq.remoting.exception.RemotingCommandException; +import com.alibaba.rocketmq.remoting.exception.RemotingConnectException; +import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException; +import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException; +import com.alibaba.rocketmq.remoting.netty.NettyClientConfig; +import com.alibaba.rocketmq.remoting.netty.NettyRemotingClient; +import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; + + +/** + * @author shijia.wxr + */ +public class FilterServerOuterAPI { + private final RemotingClient remotingClient; + + + public FilterServerOuterAPI() { + this.remotingClient = new NettyRemotingClient(new NettyClientConfig()); + } + + + public void start() { + this.remotingClient.start(); + } + + + public void shutdown() { + this.remotingClient.shutdown(); + } + + + public RegisterFilterServerResponseHeader registerFilterServerToBroker( + final String brokerAddr, + final String filterServerAddr + ) throws RemotingCommandException, RemotingConnectException, RemotingSendRequestException, + RemotingTimeoutException, InterruptedException, MQBrokerException { + RegisterFilterServerRequestHeader requestHeader = new RegisterFilterServerRequestHeader(); + requestHeader.setFilterServerAddr(filterServerAddr); + RemotingCommand request = + RemotingCommand.createRequestCommand(RequestCode.REGISTER_FILTER_SERVER, requestHeader); + + RemotingCommand response = this.remotingClient.invokeSync(brokerAddr, request, 3000); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + RegisterFilterServerResponseHeader responseHeader = + (RegisterFilterServerResponseHeader) response + .decodeCommandCustomHeader(RegisterFilterServerResponseHeader.class); + + return responseHeader; + } + default: + break; + } + + throw new MQBrokerException(response.getCode(), response.getRemark()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/FiltersrvConfig.java ---------------------------------------------------------------------- diff --git a/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/FiltersrvConfig.java b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/FiltersrvConfig.java new file mode 100644 index 0000000..fac620f --- /dev/null +++ b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/FiltersrvConfig.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.filtersrv; + +import com.alibaba.rocketmq.common.MixAll; +import com.alibaba.rocketmq.common.annotation.ImportantField; +import com.alibaba.rocketmq.remoting.common.RemotingUtil; + + +public class FiltersrvConfig { + private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, + System.getenv(MixAll.ROCKETMQ_HOME_ENV)); + + @ImportantField + private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, + System.getenv(MixAll.NAMESRV_ADDR_ENV)); + + private String connectWhichBroker = "127.0.0.1:10911"; + private String filterServerIP = RemotingUtil.getLocalAddress(); + + private int compressMsgBodyOverHowmuch = 1024 * 8; + private int zipCompressLevel = 5; + + + private boolean clientUploadFilterClassEnable = true; + + + private String filterClassRepertoryUrl = "http://fsrep.tbsite.net/filterclass"; + + private int fsServerAsyncSemaphoreValue = 2048; + private int fsServerCallbackExecutorThreads = 64; + private int fsServerWorkerThreads = 64; + + + public String getRocketmqHome() { + return rocketmqHome; + } + + + public void setRocketmqHome(String rocketmqHome) { + this.rocketmqHome = rocketmqHome; + } + + + public String getNamesrvAddr() { + return namesrvAddr; + } + + + public void setNamesrvAddr(String namesrvAddr) { + this.namesrvAddr = namesrvAddr; + } + + + public String getConnectWhichBroker() { + return connectWhichBroker; + } + + + public void setConnectWhichBroker(String connectWhichBroker) { + this.connectWhichBroker = connectWhichBroker; + } + + + public String getFilterServerIP() { + return filterServerIP; + } + + + public void setFilterServerIP(String filterServerIP) { + this.filterServerIP = filterServerIP; + } + + + public int getCompressMsgBodyOverHowmuch() { + return compressMsgBodyOverHowmuch; + } + + + public void setCompressMsgBodyOverHowmuch(int compressMsgBodyOverHowmuch) { + this.compressMsgBodyOverHowmuch = compressMsgBodyOverHowmuch; + } + + + public int getZipCompressLevel() { + return zipCompressLevel; + } + + + public void setZipCompressLevel(int zipCompressLevel) { + this.zipCompressLevel = zipCompressLevel; + } + + + public boolean isClientUploadFilterClassEnable() { + return clientUploadFilterClassEnable; + } + + + public void setClientUploadFilterClassEnable(boolean clientUploadFilterClassEnable) { + this.clientUploadFilterClassEnable = clientUploadFilterClassEnable; + } + + + public String getFilterClassRepertoryUrl() { + return filterClassRepertoryUrl; + } + + + public void setFilterClassRepertoryUrl(String filterClassRepertoryUrl) { + this.filterClassRepertoryUrl = filterClassRepertoryUrl; + } + + + public int getFsServerAsyncSemaphoreValue() { + return fsServerAsyncSemaphoreValue; + } + + + public void setFsServerAsyncSemaphoreValue(int fsServerAsyncSemaphoreValue) { + this.fsServerAsyncSemaphoreValue = fsServerAsyncSemaphoreValue; + } + + + public int getFsServerCallbackExecutorThreads() { + return fsServerCallbackExecutorThreads; + } + + + public void setFsServerCallbackExecutorThreads(int fsServerCallbackExecutorThreads) { + this.fsServerCallbackExecutorThreads = fsServerCallbackExecutorThreads; + } + + + public int getFsServerWorkerThreads() { + return fsServerWorkerThreads; + } + + + public void setFsServerWorkerThreads(int fsServerWorkerThreads) { + this.fsServerWorkerThreads = fsServerWorkerThreads; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/FiltersrvController.java ---------------------------------------------------------------------- diff --git a/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/FiltersrvController.java b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/FiltersrvController.java new file mode 100644 index 0000000..0e3f696 --- /dev/null +++ b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/FiltersrvController.java @@ -0,0 +1,226 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.filtersrv; + +import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer; +import com.alibaba.rocketmq.common.MixAll; +import com.alibaba.rocketmq.common.ThreadFactoryImpl; +import com.alibaba.rocketmq.common.UtilAll; +import com.alibaba.rocketmq.common.constant.LoggerName; +import com.alibaba.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerResponseHeader; +import com.alibaba.rocketmq.filtersrv.filter.FilterClassManager; +import com.alibaba.rocketmq.filtersrv.processor.DefaultRequestProcessor; +import com.alibaba.rocketmq.filtersrv.stats.FilterServerStatsManager; +import com.alibaba.rocketmq.remoting.RemotingServer; +import com.alibaba.rocketmq.remoting.netty.NettyRemotingServer; +import com.alibaba.rocketmq.remoting.netty.NettyServerConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + + +/** + * @author shijia.wxr + */ +public class FiltersrvController { + private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME); + + private final FiltersrvConfig filtersrvConfig; + + private final NettyServerConfig nettyServerConfig; + private final FilterClassManager filterClassManager; + + private final FilterServerOuterAPI filterServerOuterAPI = new FilterServerOuterAPI(); + private final DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer( + MixAll.FILTERSRV_CONSUMER_GROUP); + + private final ScheduledExecutorService scheduledExecutorService = Executors + .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FSScheduledThread")); + private final FilterServerStatsManager filterServerStatsManager = new FilterServerStatsManager(); + + private RemotingServer remotingServer; + + private ExecutorService remotingExecutor; + private volatile String brokerName = null; + + + public FiltersrvController(FiltersrvConfig filtersrvConfig, NettyServerConfig nettyServerConfig) { + this.filtersrvConfig = filtersrvConfig; + this.nettyServerConfig = nettyServerConfig; + this.filterClassManager = new FilterClassManager(this); + } + + + public boolean initialize() { + + MixAll.printObjectProperties(log, this.filtersrvConfig); + + + this.remotingServer = new NettyRemotingServer(this.nettyServerConfig); + + + this.remotingExecutor = + Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), + new ThreadFactoryImpl("RemotingExecutorThread_")); + + this.registerProcessor(); + + + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + + @Override + public void run() { + FiltersrvController.this.registerFilterServerToBroker(); + } + }, 3, 10, TimeUnit.SECONDS); + + this.defaultMQPullConsumer.setBrokerSuspendMaxTimeMillis(this.defaultMQPullConsumer + .getBrokerSuspendMaxTimeMillis() - 1000); + this.defaultMQPullConsumer.setConsumerTimeoutMillisWhenSuspend(this.defaultMQPullConsumer + .getConsumerTimeoutMillisWhenSuspend() - 1000); + + this.defaultMQPullConsumer.setNamesrvAddr(this.filtersrvConfig.getNamesrvAddr()); + this.defaultMQPullConsumer.setInstanceName(String.valueOf(UtilAll.getPid())); + + return true; + } + + private void registerProcessor() { + this.remotingServer + .registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor); + } + + public void registerFilterServerToBroker() { + try { + RegisterFilterServerResponseHeader responseHeader = + this.filterServerOuterAPI.registerFilterServerToBroker( + this.filtersrvConfig.getConnectWhichBroker(), this.localAddr()); + this.defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper() + .setDefaultBrokerId(responseHeader.getBrokerId()); + + if (null == this.brokerName) { + this.brokerName = responseHeader.getBrokerName(); + } + + log.info("register filter server<{}> to broker<{}> OK, Return: {} {}", + this.localAddr(), + this.filtersrvConfig.getConnectWhichBroker(), + responseHeader.getBrokerName(), + responseHeader.getBrokerId()); + } catch (Exception e) { + log.warn("register filter server Exception", e); + + log.warn("access broker failed, kill oneself"); + System.exit(-1); + } + } + + public String localAddr() { + return String.format("%s:%d", this.filtersrvConfig.getFilterServerIP(), + this.remotingServer.localListenPort()); + } + + public void start() throws Exception { + this.defaultMQPullConsumer.start(); + this.remotingServer.start(); + this.filterServerOuterAPI.start(); + this.defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper() + .setConnectBrokerByUser(true); + this.filterClassManager.start(); + this.filterServerStatsManager.start(); + } + + + public void shutdown() { + this.remotingServer.shutdown(); + this.remotingExecutor.shutdown(); + this.scheduledExecutorService.shutdown(); + this.defaultMQPullConsumer.shutdown(); + this.filterServerOuterAPI.shutdown(); + this.filterClassManager.shutdown(); + this.filterServerStatsManager.shutdown(); + } + + + public RemotingServer getRemotingServer() { + return remotingServer; + } + + + public void setRemotingServer(RemotingServer remotingServer) { + this.remotingServer = remotingServer; + } + + + public ExecutorService getRemotingExecutor() { + return remotingExecutor; + } + + + public void setRemotingExecutor(ExecutorService remotingExecutor) { + this.remotingExecutor = remotingExecutor; + } + + + public FiltersrvConfig getFiltersrvConfig() { + return filtersrvConfig; + } + + + public NettyServerConfig getNettyServerConfig() { + return nettyServerConfig; + } + + + public ScheduledExecutorService getScheduledExecutorService() { + return scheduledExecutorService; + } + + + public FilterServerOuterAPI getFilterServerOuterAPI() { + return filterServerOuterAPI; + } + + + public FilterClassManager getFilterClassManager() { + return filterClassManager; + } + + + public DefaultMQPullConsumer getDefaultMQPullConsumer() { + return defaultMQPullConsumer; + } + + + public String getBrokerName() { + return brokerName; + } + + + public void setBrokerName(String brokerName) { + this.brokerName = brokerName; + } + + + public FilterServerStatsManager getFilterServerStatsManager() { + return filterServerStatsManager; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/FiltersrvStartup.java ---------------------------------------------------------------------- diff --git a/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/FiltersrvStartup.java b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/FiltersrvStartup.java new file mode 100644 index 0000000..3fe6b22 --- /dev/null +++ b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/FiltersrvStartup.java @@ -0,0 +1,183 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.filtersrv; + +import ch.qos.logback.classic.LoggerContext; +import ch.qos.logback.classic.joran.JoranConfigurator; +import com.alibaba.rocketmq.common.MQVersion; +import com.alibaba.rocketmq.common.MixAll; +import com.alibaba.rocketmq.common.constant.LoggerName; +import com.alibaba.rocketmq.remoting.netty.NettyServerConfig; +import com.alibaba.rocketmq.remoting.netty.NettySystemConfig; +import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; +import com.alibaba.rocketmq.srvutil.ServerUtil; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedInputStream; +import java.io.FileInputStream; +import java.io.InputStream; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; + + +/** + * @author shijia.wxr + */ +public class FiltersrvStartup { + public static Logger log; + + public static void main(String[] args) { + start(createController(args)); + } + + public static FiltersrvController start(FiltersrvController controller) { + + try { + controller.start(); + } catch (Exception e) { + e.printStackTrace(); + System.exit(-1); + } + + String tip = "The Filter Server boot success, " + controller.localAddr(); + log.info(tip); + System.out.printf("%s%n", tip); + + return controller; + } + + public static FiltersrvController createController(String[] args) { + System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); + + + if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) { + NettySystemConfig.socketSndbufSize = 65535; + } + + + if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) { + NettySystemConfig.socketRcvbufSize = 1024; + } + + try { + Options options = ServerUtil.buildCommandlineOptions(new Options()); + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqfiltersrv", args, buildCommandlineOptions(options), + new PosixParser()); + if (null == commandLine) { + System.exit(-1); + return null; + } + + final FiltersrvConfig filtersrvConfig = new FiltersrvConfig(); + final NettyServerConfig nettyServerConfig = new NettyServerConfig(); + + if (commandLine.hasOption('c')) { + String file = commandLine.getOptionValue('c'); + if (file != null) { + InputStream in = new BufferedInputStream(new FileInputStream(file)); + Properties properties = new Properties(); + properties.load(in); + MixAll.properties2Object(properties, filtersrvConfig); + System.out.printf("load config properties file OK, " + file + "%n"); + in.close(); + + String port = properties.getProperty("listenPort"); + if (port != null) { + filtersrvConfig.setConnectWhichBroker(String.format("127.0.0.1:%s", port)); + } + } + } + + nettyServerConfig.setListenPort(0); + nettyServerConfig.setServerAsyncSemaphoreValue(filtersrvConfig.getFsServerAsyncSemaphoreValue()); + nettyServerConfig.setServerCallbackExecutorThreads(filtersrvConfig + .getFsServerCallbackExecutorThreads()); + nettyServerConfig.setServerWorkerThreads(filtersrvConfig.getFsServerWorkerThreads()); + + if (commandLine.hasOption('p')) { + MixAll.printObjectProperties(null, filtersrvConfig); + MixAll.printObjectProperties(null, nettyServerConfig); + System.exit(0); + } + + MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), filtersrvConfig); + if (null == filtersrvConfig.getRocketmqHome()) { + System.out.printf("Please set the " + MixAll.ROCKETMQ_HOME_ENV + + " variable in your environment to match the location of the RocketMQ installation%n"); + System.exit(-2); + } + + LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); + JoranConfigurator configurator = new JoranConfigurator(); + configurator.setContext(lc); + lc.reset(); + configurator.doConfigure(filtersrvConfig.getRocketmqHome() + "/conf/logback_filtersrv.xml"); + log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME); + + final FiltersrvController controller = + new FiltersrvController(filtersrvConfig, nettyServerConfig); + boolean initResult = controller.initialize(); + if (!initResult) { + controller.shutdown(); + System.exit(-3); + } + + Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { + private volatile boolean hasShutdown = false; + private AtomicInteger shutdownTimes = new AtomicInteger(0); + + @Override + public void run() { + synchronized (this) { + log.info("shutdown hook was invoked, " + this.shutdownTimes.incrementAndGet()); + if (!this.hasShutdown) { + this.hasShutdown = true; + long begineTime = System.currentTimeMillis(); + controller.shutdown(); + long consumingTimeTotal = System.currentTimeMillis() - begineTime; + log.info("shutdown hook over, consuming time total(ms): " + consumingTimeTotal); + } + } + } + }, "ShutdownHook")); + + return controller; + } catch (Throwable e) { + e.printStackTrace(); + System.exit(-1); + } + return null; + } + + public static Options buildCommandlineOptions(final Options options) { + Option opt = new Option("c", "configFile", true, "Filter server config properties file"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("p", "printConfigItem", false, "Print all config item"); + opt.setRequired(false); + options.addOption(opt); + + return options; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/DynaCode.java ---------------------------------------------------------------------- diff --git a/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/DynaCode.java b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/DynaCode.java new file mode 100644 index 0000000..e17e5d2 --- /dev/null +++ b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/DynaCode.java @@ -0,0 +1,393 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.filtersrv.filter; + +import com.alibaba.rocketmq.common.UtilAll; +import com.alibaba.rocketmq.common.constant.LoggerName; +import com.alibaba.rocketmq.common.filter.FilterAPI; +import com.alibaba.rocketmq.remoting.common.RemotingHelper; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.tools.JavaCompiler; +import javax.tools.ToolProvider; +import java.io.*; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; +import java.net.URLDecoder; +import java.util.*; + + +public class DynaCode { + private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME); + + private static final String FILE_SP = System.getProperty("file.separator"); + + private static final String LINE_SP = System.getProperty("line.separator"); + + private String sourcePath = System.getProperty("user.home") + FILE_SP + "rocketmq_filter_class" + FILE_SP + + UtilAll.getPid(); + + private String outPutClassPath = sourcePath; + + + private ClassLoader parentClassLoader; + + + private List<String> codeStrs; + + + private Map<String/* fullClassName */, Class<?>/* class */> loadClass; + + + private String classpath; + + + private String bootclasspath; + + + private String extdirs; + + + private String encoding = "UTF-8"; + + + private String target; + + + @SuppressWarnings("unchecked") + public DynaCode(String code) { + this(Thread.currentThread().getContextClassLoader(), Arrays.asList(code)); + } + + + public DynaCode(ClassLoader parentClassLoader, List<String> codeStrs) { + this(extractClasspath(parentClassLoader), parentClassLoader, codeStrs); + } + + + public DynaCode(String classpath, ClassLoader parentClassLoader, List<String> codeStrs) { + this.classpath = classpath; + this.parentClassLoader = parentClassLoader; + this.codeStrs = codeStrs; + this.loadClass = new HashMap<String, Class<?>>(codeStrs.size()); + } + + + private static String extractClasspath(ClassLoader cl) { + StringBuffer buf = new StringBuffer(); + while (cl != null) { + if (cl instanceof URLClassLoader) { + URL urls[] = ((URLClassLoader) cl).getURLs(); + for (int i = 0; i < urls.length; i++) { + if (buf.length() > 0) { + buf.append(File.pathSeparatorChar); + } + String s = urls[i].getFile(); + try { + s = URLDecoder.decode(s, "UTF-8"); + } catch (UnsupportedEncodingException e) { + continue; + } + File f = new File(s); + buf.append(f.getAbsolutePath()); + } + } + cl = cl.getParent(); + } + return buf.toString(); + } + + + public DynaCode(List<String> codeStrs) { + this(Thread.currentThread().getContextClassLoader(), codeStrs); + } + + public static Class<?> compileAndLoadClass(final String className, final String javaSource) + throws Exception { + String classSimpleName = FilterAPI.simpleClassName(className); + String javaCode = javaSource; + + final String newClassSimpleName = classSimpleName + System.currentTimeMillis(); + String newJavaCode = javaCode.replaceAll(classSimpleName, newClassSimpleName); + + List<String> codes = new ArrayList<String>(); + codes.add(newJavaCode); + DynaCode dc = new DynaCode(codes); + dc.compileAndLoadClass(); + Map<String, Class<?>> map = dc.getLoadClass(); + + Class<?> clazz = map.get(getQualifiedName(newJavaCode)); + return clazz; + } + + public void compileAndLoadClass() throws Exception { + String[] sourceFiles = this.uploadSrcFile(); + this.compile(sourceFiles); + this.loadClass(this.loadClass.keySet()); + } + + public Map<String, Class<?>> getLoadClass() { + return loadClass; + } + + public static String getQualifiedName(String code) { + StringBuilder sb = new StringBuilder(); + String className = getClassName(code); + if (StringUtils.isNotBlank(className)) { + + String packageName = getPackageName(code); + if (StringUtils.isNotBlank(packageName)) { + sb.append(packageName).append("."); + } + sb.append(className); + } + return sb.toString(); + } + + private String[] uploadSrcFile() throws Exception { + List<String> srcFileAbsolutePaths = new ArrayList<String>(codeStrs.size()); + for (String code : codeStrs) { + if (StringUtils.isNotBlank(code)) { + String packageName = getPackageName(code); + String className = getClassName(code); + if (StringUtils.isNotBlank(className)) { + File srcFile = null; + BufferedWriter bufferWriter = null; + try { + if (StringUtils.isBlank(packageName)) { + File pathFile = new File(sourcePath); + + if (!pathFile.exists()) { + if (!pathFile.mkdirs()) { + throw new RuntimeException("create PathFile Error!"); + } + } + srcFile = new File(sourcePath + FILE_SP + className + ".java"); + } else { + String srcPath = StringUtils.replace(packageName, ".", FILE_SP); + File pathFile = new File(sourcePath + FILE_SP + srcPath); + + if (!pathFile.exists()) { + if (!pathFile.mkdirs()) { + throw new RuntimeException("create PathFile Error!"); + } + } + srcFile = new File(pathFile.getAbsolutePath() + FILE_SP + className + ".java"); + } + synchronized (loadClass) { + loadClass.put(getFullClassName(code), null); + } + if (null != srcFile) { + LOGGER.warn("Dyna Create Java Source File:---->" + srcFile.getAbsolutePath()); + srcFileAbsolutePaths.add(srcFile.getAbsolutePath()); + srcFile.deleteOnExit(); + } + OutputStreamWriter outputStreamWriter = + new OutputStreamWriter(new FileOutputStream(srcFile), encoding); + bufferWriter = new BufferedWriter(outputStreamWriter); + for (String lineCode : code.split(LINE_SP)) { + bufferWriter.write(lineCode); + bufferWriter.newLine(); + } + bufferWriter.flush(); + } finally { + if (null != bufferWriter) { + bufferWriter.close(); + } + } + } + } + } + return srcFileAbsolutePaths.toArray(new String[srcFileAbsolutePaths.size()]); + } + + private void compile(String[] srcFiles) throws Exception { + String args[] = this.buildCompileJavacArgs(srcFiles); + ByteArrayOutputStream err = new ByteArrayOutputStream(); + JavaCompiler compiler = ToolProvider.getSystemJavaCompiler(); + if (compiler == null) { + throw new NullPointerException( + "ToolProvider.getSystemJavaCompiler() return null,please use JDK replace JRE!"); + } + int resultCode = compiler.run(null, null, err, args); + if (resultCode != 0) { + throw new Exception(err.toString(RemotingHelper.DEFAULT_CHARSET)); + } + } + + private void loadClass(Set<String> classFullNames) throws ClassNotFoundException, MalformedURLException { + synchronized (loadClass) { + ClassLoader classLoader = + new URLClassLoader(new URL[]{new File(outPutClassPath).toURI().toURL()}, + parentClassLoader); + for (String key : classFullNames) { + Class<?> classz = classLoader.loadClass(key); + if (null != classz) { + loadClass.put(key, classz); + LOGGER.info("Dyna Load Java Class File OK:----> className: " + key); + } else { + LOGGER.error("Dyna Load Java Class File Fail:----> className: " + key); + } + } + } + } + + public static String getClassName(String code) { + String className = StringUtils.substringBefore(code, "{"); + if (StringUtils.isBlank(className)) { + return className; + } + if (StringUtils.contains(code, " class ")) { + className = StringUtils.substringAfter(className, " class "); + if (StringUtils.contains(className, " extends ")) { + className = StringUtils.substringBefore(className, " extends ").trim(); + } else if (StringUtils.contains(className, " implements ")) { + className = StringUtils.trim(StringUtils.substringBefore(className, " implements ")); + } else { + className = StringUtils.trim(className); + } + } else if (StringUtils.contains(code, " interface ")) { + className = StringUtils.substringAfter(className, " interface "); + if (StringUtils.contains(className, " extends ")) { + className = StringUtils.substringBefore(className, " extends ").trim(); + } else { + className = StringUtils.trim(className); + } + } else if (StringUtils.contains(code, " enum ")) { + className = StringUtils.trim(StringUtils.substringAfter(className, " enum ")); + } else { + return StringUtils.EMPTY; + } + return className; + } + + public static String getPackageName(String code) { + String packageName = + StringUtils.substringBefore(StringUtils.substringAfter(code, "package "), ";").trim(); + return packageName; + } + + public static String getFullClassName(String code) { + String packageName = getPackageName(code); + String className = getClassName(code); + return StringUtils.isBlank(packageName) ? className : packageName + "." + className; + } + + private String[] buildCompileJavacArgs(String srcFiles[]) { + ArrayList<String> args = new ArrayList<String>(); + if (StringUtils.isNotBlank(classpath)) { + args.add("-classpath"); + args.add(classpath); + } + if (StringUtils.isNotBlank(outPutClassPath)) { + args.add("-d"); + args.add(outPutClassPath); + } + if (StringUtils.isNotBlank(sourcePath)) { + args.add("-sourcepath"); + args.add(sourcePath); + } + if (StringUtils.isNotBlank(bootclasspath)) { + args.add("-bootclasspath"); + args.add(bootclasspath); + } + if (StringUtils.isNotBlank(extdirs)) { + args.add("-extdirs"); + args.add(extdirs); + } + if (StringUtils.isNotBlank(encoding)) { + args.add("-encoding"); + args.add(encoding); + } + if (StringUtils.isNotBlank(target)) { + args.add("-target"); + args.add(target); + } + for (int i = 0; i < srcFiles.length; i++) { + args.add(srcFiles[i]); + } + return args.toArray(new String[args.size()]); + } + + public String getOutPutClassPath() { + return outPutClassPath; + } + + public void setOutPutClassPath(String outPutClassPath) { + this.outPutClassPath = outPutClassPath; + } + + public String getSourcePath() { + return sourcePath; + } + + public void setSourcePath(String sourcePath) { + this.sourcePath = sourcePath; + } + + public ClassLoader getParentClassLoader() { + return parentClassLoader; + } + + public void setParentClassLoader(ClassLoader parentClassLoader) { + this.parentClassLoader = parentClassLoader; + } + + public String getClasspath() { + return classpath; + } + + public void setClasspath(String classpath) { + this.classpath = classpath; + } + + public String getBootclasspath() { + return bootclasspath; + } + + public void setBootclasspath(String bootclasspath) { + this.bootclasspath = bootclasspath; + } + + public String getExtdirs() { + return extdirs; + } + + public void setExtdirs(String extdirs) { + this.extdirs = extdirs; + } + + public String getEncoding() { + return encoding; + } + + public void setEncoding(String encoding) { + this.encoding = encoding; + } + + public String getTarget() { + return target; + } + + public void setTarget(String target) { + this.target = target; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/FilterClassFetchMethod.java ---------------------------------------------------------------------- diff --git a/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/FilterClassFetchMethod.java b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/FilterClassFetchMethod.java new file mode 100644 index 0000000..27b19ce --- /dev/null +++ b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/FilterClassFetchMethod.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.filtersrv.filter; + +public interface FilterClassFetchMethod { + public String fetch(final String topic, final String consumerGroup, final String className); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/FilterClassInfo.java ---------------------------------------------------------------------- diff --git a/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/FilterClassInfo.java b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/FilterClassInfo.java new file mode 100644 index 0000000..f3e747e --- /dev/null +++ b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/FilterClassInfo.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.filtersrv.filter; + +import com.alibaba.rocketmq.common.filter.MessageFilter; + + +public class FilterClassInfo { + private String className; + private int classCRC; + private MessageFilter messageFilter; + + + public int getClassCRC() { + return classCRC; + } + + + public void setClassCRC(int classCRC) { + this.classCRC = classCRC; + } + + + public MessageFilter getMessageFilter() { + return messageFilter; + } + + + public void setMessageFilter(MessageFilter messageFilter) { + this.messageFilter = messageFilter; + } + + + public String getClassName() { + return className; + } + + + public void setClassName(String className) { + this.className = className; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/FilterClassLoader.java ---------------------------------------------------------------------- diff --git a/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/FilterClassLoader.java b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/FilterClassLoader.java new file mode 100644 index 0000000..8966ca2 --- /dev/null +++ b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/FilterClassLoader.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.filtersrv.filter; + +public class FilterClassLoader extends ClassLoader { + public final Class<?> createNewClass(String name, byte[] b, int off, int len) throws ClassFormatError { + return this.defineClass(name, b, off, len); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/FilterClassManager.java ---------------------------------------------------------------------- diff --git a/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/FilterClassManager.java b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/FilterClassManager.java new file mode 100644 index 0000000..618db8e --- /dev/null +++ b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/FilterClassManager.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.filtersrv.filter; + +import com.alibaba.rocketmq.common.MixAll; +import com.alibaba.rocketmq.common.ThreadFactoryImpl; +import com.alibaba.rocketmq.common.UtilAll; +import com.alibaba.rocketmq.common.constant.LoggerName; +import com.alibaba.rocketmq.common.filter.MessageFilter; +import com.alibaba.rocketmq.filtersrv.FiltersrvController; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + + +public class FilterClassManager { + private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME); + + private final Object compileLock = new Object(); + private final FiltersrvController filtersrvController; + + private final ScheduledExecutorService scheduledExecutorService = Executors + .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FSGetClassScheduledThread")); + private ConcurrentHashMap<String/* topic@consumerGroup */, FilterClassInfo> filterClassTable = + new ConcurrentHashMap<String, FilterClassInfo>(128); + private FilterClassFetchMethod filterClassFetchMethod; + + + public FilterClassManager(FiltersrvController filtersrvController) { + this.filtersrvController = filtersrvController; + this.filterClassFetchMethod = + new HttpFilterClassFetchMethod(this.filtersrvController.getFiltersrvConfig() + .getFilterClassRepertoryUrl()); + } + + + public void start() { + if (!this.filtersrvController.getFiltersrvConfig().isClientUploadFilterClassEnable()) { + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + + @Override + public void run() { + fetchClassFromRemoteHost(); + } + }, 1, 1, TimeUnit.MINUTES); + } + } + + private void fetchClassFromRemoteHost() { + Iterator<Entry<String, FilterClassInfo>> it = this.filterClassTable.entrySet().iterator(); + while (it.hasNext()) { + try { + Entry<String, FilterClassInfo> next = it.next(); + FilterClassInfo filterClassInfo = next.getValue(); + String[] topicAndGroup = next.getKey().split("@"); + String responseStr = + this.filterClassFetchMethod.fetch(topicAndGroup[0], topicAndGroup[1], + filterClassInfo.getClassName()); + byte[] filterSourceBinary = responseStr.getBytes("UTF-8"); + int classCRC = UtilAll.crc32(responseStr.getBytes("UTF-8")); + if (classCRC != filterClassInfo.getClassCRC()) { + String javaSource = new String(filterSourceBinary, MixAll.DEFAULT_CHARSET); + Class<?> newClass = + DynaCode.compileAndLoadClass(filterClassInfo.getClassName(), javaSource); + Object newInstance = newClass.newInstance(); + filterClassInfo.setMessageFilter((MessageFilter) newInstance); + filterClassInfo.setClassCRC(classCRC); + + log.info("fetch Remote class File OK, {} {}", next.getKey(), + filterClassInfo.getClassName()); + } + } catch (Exception e) { + log.error("fetchClassFromRemoteHost Exception", e); + } + } + } + + public void shutdown() { + this.scheduledExecutorService.shutdown(); + } + + public boolean registerFilterClass(final String consumerGroup, final String topic, + final String className, final int classCRC, final byte[] filterSourceBinary) { + final String key = buildKey(consumerGroup, topic); + + + boolean registerNew = false; + FilterClassInfo filterClassInfoPrev = this.filterClassTable.get(key); + if (null == filterClassInfoPrev) { + registerNew = true; + } else { + if (this.filtersrvController.getFiltersrvConfig().isClientUploadFilterClassEnable()) { + if (filterClassInfoPrev.getClassCRC() != classCRC && classCRC != 0) { + registerNew = true; + } + } + } + + if (registerNew) { + synchronized (this.compileLock) { + filterClassInfoPrev = this.filterClassTable.get(key); + if (null != filterClassInfoPrev && filterClassInfoPrev.getClassCRC() == classCRC) { + return true; + } + + try { + + FilterClassInfo filterClassInfoNew = new FilterClassInfo(); + filterClassInfoNew.setClassName(className); + filterClassInfoNew.setClassCRC(0); + filterClassInfoNew.setMessageFilter(null); + + if (this.filtersrvController.getFiltersrvConfig().isClientUploadFilterClassEnable()) { + String javaSource = new String(filterSourceBinary, MixAll.DEFAULT_CHARSET); + Class<?> newClass = DynaCode.compileAndLoadClass(className, javaSource); + Object newInstance = newClass.newInstance(); + filterClassInfoNew.setMessageFilter((MessageFilter) newInstance); + filterClassInfoNew.setClassCRC(classCRC); + } + + this.filterClassTable.put(key, filterClassInfoNew); + } catch (Throwable e) { + String info = + String + .format( + "FilterServer, registerFilterClass Exception, consumerGroup: %s topic: %s className: %s", + consumerGroup, topic, className); + log.error(info, e); + return false; + } + } + } + + return true; + } + + private static String buildKey(final String consumerGroup, final String topic) { + return topic + "@" + consumerGroup; + } + + public FilterClassInfo findFilterClass(final String consumerGroup, final String topic) { + return this.filterClassTable.get(buildKey(consumerGroup, topic)); + } + + + public FilterClassFetchMethod getFilterClassFetchMethod() { + return filterClassFetchMethod; + } + + + public void setFilterClassFetchMethod(FilterClassFetchMethod filterClassFetchMethod) { + this.filterClassFetchMethod = filterClassFetchMethod; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/HttpFilterClassFetchMethod.java ---------------------------------------------------------------------- diff --git a/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/HttpFilterClassFetchMethod.java b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/HttpFilterClassFetchMethod.java new file mode 100644 index 0000000..88cb572 --- /dev/null +++ b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/HttpFilterClassFetchMethod.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.filtersrv.filter; + +import com.alibaba.rocketmq.common.constant.LoggerName; +import com.alibaba.rocketmq.common.utils.HttpTinyClient; +import com.alibaba.rocketmq.common.utils.HttpTinyClient.HttpResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class HttpFilterClassFetchMethod implements FilterClassFetchMethod { + private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME); + private final String url; + + + public HttpFilterClassFetchMethod(String url) { + this.url = url; + } + + + @Override + public String fetch(String topic, String consumerGroup, String className) { + String thisUrl = String.format("%s/%s.java", this.url, className); + + try { + HttpResult result = HttpTinyClient.httpGet(thisUrl, null, null, "UTF-8", 5000); + if (200 == result.code) { + return result.content; + } + } catch (Exception e) { + log.error( + String.format("call <%s> exception, Topic: %s Group: %s", thisUrl, topic, consumerGroup), e); + } + + return null; + } +}
