Repository: incubator-rocketmq-site
Updated Branches:
  refs/heads/asf-site 93c5936c5 -> 3ee8692be


[ROCKETMQ-121]Docs of filtering messages based on SQL92 closes 
apache/incubator-rocketmq-site#11


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/commit/42a8b8c9
Tree: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/tree/42a8b8c9
Diff: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/diff/42a8b8c9

Branch: refs/heads/asf-site
Commit: 42a8b8c9e4294081d9b37737019cf152cd64a025
Parents: c87ce9e
Author: vsair <[email protected]>
Authored: Wed Apr 26 17:07:36 2017 +0800
Committer: dongeforever <[email protected]>
Committed: Wed Apr 26 17:07:36 2017 +0800

----------------------------------------------------------------------
 _data/navigation.yml                            |   2 +
 _docs/19-filter-by-sql92-example.md             | 109 +++++++++++++++++++
 ...4-26-filter-messages-by-sql92-in-rocketmq.md | 105 ++++++++++++++++++
 assets/images/blog/filter_build_cq_apache.png   | Bin 0 -> 16104 bytes
 assets/images/blog/filter_structure_apach.png   | Bin 0 -> 21027 bytes
 5 files changed, 216 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/blob/42a8b8c9/_data/navigation.yml
----------------------------------------------------------------------
diff --git a/_data/navigation.yml b/_data/navigation.yml
index b239dd5..7efeea7 100644
--- a/_data/navigation.yml
+++ b/_data/navigation.yml
@@ -33,6 +33,8 @@ docs:
         url: /docs/schedule-example/
       - title: "Batch Example"
         url: /docs/batch-example/
+      - title: "Filter By SQL92 Example"
+        url: /docs/filter-by-sql92-example/
 
   - title: Deployment & Operations
     children:

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/blob/42a8b8c9/_docs/19-filter-by-sql92-example.md
----------------------------------------------------------------------
diff --git a/_docs/19-filter-by-sql92-example.md 
b/_docs/19-filter-by-sql92-example.md
new file mode 100644
index 0000000..111d616
--- /dev/null
+++ b/_docs/19-filter-by-sql92-example.md
@@ -0,0 +1,109 @@
+---
+title: "Filter By SQL92 Example "
+permalink: /docs/filter-by-sql92-example/
+excerpt: "How to filter messages by SQL92 in Apache RocketMQ."
+modified: 2017-04-26T16:35:00-04:00
+---
+
+
+{% include toc %}
+
+In most cases, tag is simple and useful to select message as you want.For 
example:
+
+```java
+DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
+consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
+```
+
+Consumer will recieve messages that contains TAGA or TAGB or TAGC. But the 
limitation is that one message only could has one tag, this may be not suitable 
for more sophisticated scenarios.At this time, you could use sql expression to 
select messages.
+
+### Principle
+
+SQL feature could do some calculation through the properties you put in 
messages when sending. Under the grammars defined by RocketMQ, you can 
implement some interesting logic as you want. Here is an example:
+
+<pre>
+------------
+| message  |
+|----------|  a > 5 AND b = 'abc'
+| a = 10   |  --------------------> Gotten
+| b = 'abc'|
+| c = true |
+------------
+------------
+| message  |
+|----------|   a > 5 AND b = 'abc'
+| a = 1    |  --------------------> Missed
+| b = 'abc'|
+| c = true |
+------------
+</pre>
+
+### Grammars
+
+RocketMQ only defines some basic grammars to support this feature. Not enough 
? You could also extend it easily.
+
+1. Numeric comparison, like `>`, `>=`, `<`, `<=`, `BETWEEN`, `=`;
+2. Character comparison, like `=`, `<>`, `IN`;
+3. `IS NULL` or `IS NOT NULL`;
+4. Logical `AND`, logical `OR`, logical `NOT`;
+
+Constant type are:
+
+1. Numeric, like 123, 3.1415;
+2. Character, like 'abc', must be maked with single quotes;
+3. `NULL`, special constant;
+4. Boolean, `TRUE` or `FALSE`;
+
+### Interface
+
+Only push consumer could select messages by SQL92.The interface is:
+
+`public void subscribe(final String topic, final MessageSelector 
messageSelector)`
+
+### Examples
+
+You can put properties in message through method `putUserProperty` when 
sending.
+
+```java
+DefaultMQProducer producer = new 
DefaultMQProducer("please_rename_unique_group_name");
+try {
+    producer.start();
+} catch (MQClientException e) {
+    e.printStackTrace();
+    return;
+}
+
+Message msg = new Message("TopicTest",
+    tag,
+    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
+);
+// Set some properties.
+msg.putUserProperty("a", String.valueOf(i));
+
+SendResult sendResult = producer.send(msg);
+   
+producer.shutdown();
+```
+
+Use `MessageSelector.bySql` to select messages through SQL92 when consuming.
+
+```java
+DefaultMQPushConsumer consumer = new 
DefaultMQPushConsumer("please_rename_unique_group_name_4");
+
+try {
+       // only subsribe messages have property a, also a >=0 and a <= 3
+    consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
+} catch (MQClientException e) {
+    e.printStackTrace();
+    return;
+}
+
+consumer.registerMessageListener(new MessageListenerConcurrently() {
+    @Override
+    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, 
ConsumeConcurrentlyContext context) {
+        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+    }
+});
+consumer.start();
+```
+ 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/blob/42a8b8c9/_posts/2017-04-26-filter-messages-by-sql92-in-rocketmq.md
----------------------------------------------------------------------
diff --git a/_posts/2017-04-26-filter-messages-by-sql92-in-rocketmq.md 
b/_posts/2017-04-26-filter-messages-by-sql92-in-rocketmq.md
new file mode 100644
index 0000000..1a49d22
--- /dev/null
+++ b/_posts/2017-04-26-filter-messages-by-sql92-in-rocketmq.md
@@ -0,0 +1,105 @@
+---
+title: "Filter Messages By SQL92 In RocketMQ"
+categories:
+  - RocketMQ
+tags:
+  - RocketMQ
+  - Filter
+---
+
+
+So far, RocketMQ only support message filtering feature by `TAG`, but one 
message only can own one tag, this is too limited to meet complex business 
requirements.
+
+So, we want to define and implement a reasonable filter language based on a 
subset of the SQL 92 expression syntax to support customized message filtering.
+
+### Why Subset Of SQL92
+
+Let RocketMQ has the ability of message filtering is the purpose of this 
issue, and as we know, SQL92 is used widely and most persons are familiar with 
it.It's resonable to select it as RocketMQ's grammar.
+
+As I know, ActiveMQ already impllement this functionality based on JavaCC, 
it's simple and exntensible.So I just extract it and integrate into RocketMQ, 
only some grammars:
+
+1. Numeric comparison, like `>`, `>=`, `<`, `<=`, `BETWEEN`, `=`;
+2. Character comparison, like `=`, `<>`, `IN`;
+3. `IS NULL` or `IS NOT NULL`;
+4. Logical `AND`, logical `OR`, logical `NOT`;
+
+Constant type are:
+
+1. Numeric, like 123, 3.1415;
+2. Character, like 'abc', must be maked with single quotes;
+3. `NULL`, special constant;
+4. Boolean, `TRUE` or `FALSE`;
+
+### Design
+ - Structure
+
+![screenshot](/assets/images/blog/filter_structure_apach.png)
+
+
+1. Broker collects the expression of consumer through heartbeat request, and 
saved in `ConsumerFilterManager`.
+2. When consumer pulls messages, broker will construct a `MessageFilter`(an 
interface) with compiled expression and subscription data to select matched 
messages in `CommitLog`.
+
+The main logic is simple.
+
+ - New Module, rocketmq-filter
+
+The implementation of SQL92 language is placed in this module which have 
dependency on common module.
+
+Broker compile or evaluate expression through the interface of `FilterSpi` 
contained in `FilterFactory` that manage all `FilterSpi` and also support new 
one to register.
+
+ - How to manage consumer's expression data
+
+Different from tag filtering, expression of SQL92 should be compiled first to 
check whether is leagal and then use the complied expression to compute. This 
procedure is designed to take place at broker.
+
+`ConsumerManager` manage the suscriptions of push consumer, and 
`ConsumerFilterManager` manage the expression info of push consumer who wish to 
filter message by special language, the info includes data version, expression, 
compiled expression, alive time and etc.
+
+ - How to filter message by expression
+
+I redesign the interface `getMessage` of `MessageStore` by replace the last 
parameter `SubscriptionData` to `MessageFilter` that is also refactored. The 
purpose is to make module `rocketmq-store` has no relation with protocol.
+
+When get message, the implementation `ExpressionMessageFilter` would check 
whether the message is matched by `BitsArray` which will be refered later or 
evaluation, just as the mechanism of tag filtering.
+
+ - Optimization, pre-calculate the filtering result when build consume queue
+
+It's poor performance to do filter when pull message:
+
+1. off-heap to heap, once every consumer subscribed same topic pull message.
+2. decode message properties, once every consumer subscribed same topic pull 
message.
+
+`BloomFilter` and pre-calculation are adopted to optimize the situation:
+
+
+![screenshot](/assets/images/blog/filter_build_cq_apache.png)
+
+1. Every consumer has been asigned some bit position of `BloomFilter` when 
register to broker.
+2. When broker build queue after message into `CommitLog`, the consumer's 
filtering result would be calculated, and all resuls are assembled as a 
`BitsArray` saved in `ConsumeQueueExt`.
+3. `ConsumeQueueExt` is a store file linked to `ConsumeQueue`, `ConsumeQueue` 
could find the data by the `tagsCode` whitch is already replaced by the 
address(for compitable, the range is Long.MIN\_VALUE to Integer.MIN\_VALUE) 
generated by `ConsumeQueueExt`.
+4. `ExpressionMessageFilter` could use the `BitsArray` to check whether the 
message is matched. Because of BloomFilter's collision, it also need to decode 
properties to do calculation for matched message(may could be reduced by check 
the collision, not include in this edition).
+
+This optimization is suitable for:
+
+1. High subscription ratio.
+2. Large properties.
+
+This optimization is off default, it need set some configs when broker 
starting to switch on:
+
+1. enableCalcFilterBitMap = true, means to caculate bitmap when build consume 
queue.
+2. expectConsumerNumUseFilter = XX(Integer, default is 32), means estimated 
consumer num subscribe same topic.
+3. maxErrorRateOfBloomFilter = XX(1~100, default is 20), means error rate of 
bloom filter.
+4. enableConsumeQueueExt = true, means construct consume queue extend file.
+
+### Interface
+
+Only push consumer could filter message by SQL92 expression in this edition, 
the interface is:
+
+`public void subscribe(final String topic, final MessageSelector 
messageSelector)`
+
+### Performance Comparison
+
+Configuration of broker machine: 32 core, 128G memory, 1000Mb/s full duplex 
dual network
+
+Producer send message with 1k body and 1k properties.
+
+Five consumers consume message through push model, every consumer would get 
1/5 messages of total.
+
+Cpu and gc frequency is about 30% lower when do pre-calculate filtering result.

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/blob/42a8b8c9/assets/images/blog/filter_build_cq_apache.png
----------------------------------------------------------------------
diff --git a/assets/images/blog/filter_build_cq_apache.png 
b/assets/images/blog/filter_build_cq_apache.png
new file mode 100644
index 0000000..1a0a29a
Binary files /dev/null and b/assets/images/blog/filter_build_cq_apache.png 
differ

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/blob/42a8b8c9/assets/images/blog/filter_structure_apach.png
----------------------------------------------------------------------
diff --git a/assets/images/blog/filter_structure_apach.png 
b/assets/images/blog/filter_structure_apach.png
new file mode 100644
index 0000000..aae5172
Binary files /dev/null and b/assets/images/blog/filter_structure_apach.png 
differ

Reply via email to