[
https://issues.apache.org/jira/browse/ROCKETMQ-121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15948846#comment-15948846
]
ASF GitHub Bot commented on ROCKETMQ-121:
-----------------------------------------
Github user vongosling commented on a diff in the pull request:
https://github.com/apache/incubator-rocketmq/pull/82#discussion_r108883657
--- Diff:
broker/src/main/java/org/apache/rocketmq/broker/filter/CommitLogDispatcherCalcBitMap.java
---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.filter;
+
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.filter.util.BitsArray;
+import org.apache.rocketmq.store.CommitLogDispatcher;
+import org.apache.rocketmq.store.DispatchRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Iterator;
+
+/**
+ * Calculate bit map of filter.
+ */
+public class CommitLogDispatcherCalcBitMap implements CommitLogDispatcher {
+
+ private static final Logger log =
LoggerFactory.getLogger(LoggerName.FILTER_LOGGER_NAME);
+
+ protected final BrokerConfig brokerConfig;
+ protected final ConsumerFilterManager consumerFilterManager;
+
+ public CommitLogDispatcherCalcBitMap(BrokerConfig brokerConfig,
ConsumerFilterManager consumerFilterManager) {
+ this.brokerConfig = brokerConfig;
+ this.consumerFilterManager = consumerFilterManager;
+ }
+
+ @Override
+ public void dispatch(DispatchRequest request) {
+ if (!this.brokerConfig.isEnableCalcFilterBitMap()) {
+ return;
+ }
+
+ try {
+
+ Collection<ConsumerFilterData> filterDatas =
consumerFilterManager.get(request.getTopic());
+
+ if (filterDatas == null || filterDatas.isEmpty()) {
+ return;
+ }
+
+ Iterator<ConsumerFilterData> iterator = filterDatas.iterator();
+ BitsArray filterBitMap = BitsArray.create(
+ this.consumerFilterManager.getBloomFilter().getM()
+ );
+
+ long startTime = System.currentTimeMillis();
+ while (iterator.hasNext()) {
+ ConsumerFilterData filterData = iterator.next();
+
+ if (filterData.getCompiledExpression() == null) {
+ log.error("[BUG] Consumer in filter manager has no
compiled expression! {}", filterData);
+ continue;
+ }
+
+ if (filterData.getBloomFilterData() == null) {
+ log.error("[BUG] Consumer in filter manager has no
bloom data! {}", filterData);
+ continue;
+ }
+
+ Object ret = null;
+ try {
+ MessageEvaluationContext context = new
MessageEvaluationContext(request.getPropertiesMap());
+
+ ret =
filterData.getCompiledExpression().evaluate(context);
+ } catch (Throwable e) {
+ log.error("Calc filter bit map error!commitLogOffset="
+ request.getCommitLogOffset() +
+ ", consumer=" + filterData, e);
+ }
+
+ log.debug("Result of Calc bit map:ret={}, data={},
props={}, offset={}", ret, filterData, request.getPropertiesMap(),
request.getCommitLogOffset());
+
+ // eval true
+ if (ret != null && ret instanceof Boolean && (Boolean)
ret) {
+ consumerFilterManager.getBloomFilter().hashTo(
+ filterData.getBloomFilterData(),
+ filterBitMap
+ );
+ }
+ }
+
+ request.setBitMap(filterBitMap.bytes());
+
+ long eclipseTime = System.currentTimeMillis() - startTime;
+ // 1ms
+ if (eclipseTime >= 1) {
+ log.warn("Spend {} ms to calc bit map, consumerNum={},
topic={}", eclipseTime, filterDatas.size(), request.getTopic());
+ }
+ } catch (Throwable e) {
+ log.error("Calc bit map error! topic=" + request.getTopic() +
", offset=" + request.getCommitLogOffset()
--- End diff --
Follow the previous comment
> Support message filtering based on SQL92
> ----------------------------------------
>
> Key: ROCKETMQ-121
> URL: https://issues.apache.org/jira/browse/ROCKETMQ-121
> Project: Apache RocketMQ
> Issue Type: Wish
> Components: rocketmq-client, rocketmq-store
> Reporter: yukon
> Assignee: vongosling
> Priority: Minor
>
> So far, RocketMQ only support message filtering feature by `TAG`, but one
> message only can own one tag, this is too limited to meet complex business
> requirements.
> So, we want to define and implement a reasonable filter language based on a
> subset of the SQL 92 expression syntax to support customized message
> filtering.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)