Repository: kylin
Updated Branches:
  refs/heads/master 61a08d429 -> 0018a2124


KYLIN-2088 Support intersect count for calculation of retention or conversion 
rates

Signed-off-by: Yang Li <liy...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/b4c970ad
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/b4c970ad
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/b4c970ad

Branch: refs/heads/master
Commit: b4c970adf18362daade77e936693dac08c0639e1
Parents: 61a08d4
Author: sunyerui <sunye...@gmail.com>
Authored: Wed Oct 12 20:59:54 2016 +0800
Committer: Yang Li <liy...@apache.org>
Committed: Sun Oct 16 08:10:05 2016 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |  4 +-
 .../org/apache/kylin/measure/MeasureType.java   |  5 ++
 .../kylin/measure/bitmap/BitmapCounter.java     | 32 +++++++
 .../BitmapIntersectDistinctCountAggFunc.java    | 94 ++++++++++++++++++++
 .../kylin/measure/bitmap/BitmapMeasureType.java |  9 ++
 .../apache/kylin/query/ITKylinQueryTest.java    |  6 ++
 .../query/sql_intersect_count/query00.sql       | 32 +++++++
 .../kylin/query/relnode/OLAPAggregateRel.java   | 16 +++-
 8 files changed, 195 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/b4c970ad/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 99c3c5a..7dacd06 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -765,7 +765,9 @@ abstract public class KylinConfigBase implements 
Serializable {
     }
 
     public Map<String, String> getUDFs() {
-        return getPropertiesByPrefix("kylin.query.udf.");
+        Map<String, String> udfMap = getPropertiesByPrefix("kylin.query.udf.");
+        udfMap.put("intersect_count", 
"org.apache.kylin.measure.bitmap.BitmapIntersectDistinctCountAggFunc");
+        return udfMap;
     }
 
     public int getHBaseMaxConnectionThreads() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/b4c970ad/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java 
b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
index 82618e9..e7312f2 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
@@ -115,6 +115,11 @@ abstract public class MeasureType<T> {
     /** Returns a Calcite aggregation function implementation class */
     abstract public Class<?> getRewriteCalciteAggrFunctionClass();
 
+    /** Some measure may return different class depends on call name, eg. 
BitmapMeasureType */
+    public Class<?> getRewriteCalciteAggrFunctionClass(String callName) {
+        return getRewriteCalciteAggrFunctionClass();
+    }
+
     /* 
============================================================================
      * Storage
      * 
---------------------------------------------------------------------------- */

http://git-wip-us.apache.org/repos/asf/kylin/blob/b4c970ad/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounter.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounter.java
 
b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounter.java
index d3b57a7..827390d 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounter.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounter.java
@@ -47,6 +47,12 @@ public class BitmapCounter implements 
Comparable<BitmapCounter> {
         bitmap.clear();
     }
 
+    public BitmapCounter clone() {
+        BitmapCounter newCounter = new BitmapCounter();
+        newCounter.bitmap = bitmap.clone();
+        return newCounter;
+    }
+
     public void add(int value) {
         bitmap.add(value);
     }
@@ -74,6 +80,10 @@ public class BitmapCounter implements 
Comparable<BitmapCounter> {
         this.bitmap.or(another.bitmap);
     }
 
+    public void intersect(BitmapCounter another) {
+        this.bitmap.and(another.bitmap);
+    }
+
     public long getCount() {
         return this.bitmap.getCardinality();
     }
@@ -107,6 +117,28 @@ public class BitmapCounter implements 
Comparable<BitmapCounter> {
     }
 
     @Override
+    public String toString() {
+        long count = getCount();
+        if (count <= 10) {
+            return "(" + count + ")" + bitmap.toString();
+        } else {
+            StringBuilder sb = new StringBuilder();
+            sb.append("(").append(count).append("){");
+            int values = 0;
+            for (Integer v : bitmap) {
+                if (values++ < 10) {
+                    sb.append(v).append(",");
+                } else {
+                    sb.append("...");
+                    break;
+                }
+            }
+            sb.append("}");
+            return sb.toString();
+        }
+    }
+
+    @Override
     public int hashCode() {
         final int prime = 31;
         int result = 1;

http://git-wip-us.apache.org/repos/asf/kylin/blob/b4c970ad/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java
 
b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java
new file mode 100644
index 0000000..cf42d1b
--- /dev/null
+++ 
b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.measure.bitmap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * BitmapIntersectDistinctCountAggFunc is an UDAF used for calculating the 
intersection of two or more bitmaps
+ * Usage:   intersect_count(columnToCount, columnToFilter, filterList)
+ * Example: intersect_count(uuid, event, array['A', 'B', 'C']), meaning find 
the count of uuid in all A/B/C 3 bitmaps
+ *          requires an bitmap count distinct measure of uuid, and an 
dimension of event
+ */
+public class BitmapIntersectDistinctCountAggFunc {
+    private static final Logger logger = 
LoggerFactory.getLogger(BitmapIntersectDistinctCountAggFunc.class);
+
+    public static class RetentionPartialResult {
+        Map<Object, BitmapCounter> map;
+        List keyList;
+
+        public RetentionPartialResult() {
+            map = new LinkedHashMap<>();
+        }
+
+        public void add(Object key, List keyList, Object value) {
+            if (this.keyList == null) {
+                this.keyList = keyList;
+            }
+            BitmapCounter counter = map.get(key);
+            if (counter == null) {
+                counter = new BitmapCounter();
+                map.put(key, counter);
+            }
+            counter.merge((BitmapCounter)value);
+        }
+
+        public long result() {
+            if (keyList == null || keyList.isEmpty()) {
+                return 0;
+            }
+            BitmapCounter counter = null;
+            for (Object key : keyList) {
+                BitmapCounter c = map.get(key);
+                if (c == null) {
+                    // We have a key in filter list but not in map, meaning 
there's no intersect data
+                    return 0;
+                } else {
+                    if (counter == null) {
+                        counter = c.clone();
+                    }
+                    counter.intersect(c);
+                }
+            }
+            return counter.getCount();
+        }
+    }
+
+    public static RetentionPartialResult init() {
+        return new RetentionPartialResult();
+    }
+
+    public static RetentionPartialResult add(RetentionPartialResult result, 
Object value, Object key, List keyList) {
+        result.add(key, keyList, value);
+        return result;
+    }
+
+    public static RetentionPartialResult merge(RetentionPartialResult result, 
Object value, Object key, List keyList) {
+        return add(result, value, key, keyList);
+    }
+
+    public static long result(RetentionPartialResult result) {
+        return result.result();
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/kylin/blob/b4c970ad/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java
 
b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java
index be96eb5..2b88e21 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java
@@ -39,6 +39,7 @@ import org.apache.kylin.metadata.model.TblColRef;
  */
 public class BitmapMeasureType extends MeasureType<BitmapCounter> {
     public static final String FUNC_COUNT_DISTINCT = "COUNT_DISTINCT";
+    public static final String FUNC_INTERSECT_COUNT_DISTINCT = 
"INTERSECT_COUNT";
     public static final String DATATYPE_BITMAP = "bitmap";
 
     public static class Factory extends MeasureTypeFactory<BitmapCounter> {
@@ -160,6 +161,14 @@ public class BitmapMeasureType extends 
MeasureType<BitmapCounter> {
         return BitmapDistinctCountAggFunc.class;
     }
 
+    @Override
+    public Class<?> getRewriteCalciteAggrFunctionClass(String callName) {
+        if (callName != null && 
callName.equalsIgnoreCase(FUNC_INTERSECT_COUNT_DISTINCT)) {
+            return BitmapIntersectDistinctCountAggFunc.class;
+        }
+        return BitmapDistinctCountAggFunc.class;
+    }
+
     // In order to keep compatibility with old version, tinyint/smallint/int 
column use value directly, without dictionary
     private boolean needDictionaryColumn(FunctionDesc functionDesc) {
         DataType dataType = 
functionDesc.getParameter().getColRefs().get(0).getType();

http://git-wip-us.apache.org/repos/asf/kylin/blob/b4c970ad/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
----------------------------------------------------------------------
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java 
b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
index 59a3a04..a0706ca 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
@@ -355,4 +355,10 @@ public class ITKylinQueryTest extends KylinTestBase {
         this.batchExecuteQuery(getQueryFolderPrefix() + 
"src/test/resources/query/sql_window");
     }
 
+    @Test
+    public void testIntersectCountQuery() throws Exception {
+        // cannot compare coz H2 does not support intersect count yet..
+        this.batchExecuteQuery(getQueryFolderPrefix() + 
"src/test/resources/query/sql_intersect_count");
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/b4c970ad/kylin-it/src/test/resources/query/sql_intersect_count/query00.sql
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/resources/query/sql_intersect_count/query00.sql 
b/kylin-it/src/test/resources/query/sql_intersect_count/query00.sql
new file mode 100644
index 0000000..15e274a
--- /dev/null
+++ b/kylin-it/src/test/resources/query/sql_intersect_count/query00.sql
@@ -0,0 +1,32 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+select
+week_beg_dt as week,
+intersect_count( seller_id, lstg_format_name, array['FP-GTC']) as a,
+intersect_count( seller_id, lstg_format_name, array['Auction']) as b,
+intersect_count( seller_id, lstg_format_name, array['Others']) as c,
+intersect_count( seller_id, lstg_format_name, array['FP-GTC', 'Auction']) as 
ab,
+intersect_count( seller_id, lstg_format_name, array['FP-GTC', 'Others']) as ac,
+intersect_count( seller_id, lstg_format_name, array['FP-GTC', 'Auction', 
'Others']) as abc,
+count(distinct seller_id) as sellers,
+count(*) as cnt
+from test_kylin_fact left join edw.test_cal_dt on test_kylin_fact.cal_dt = 
edw.test_cal_dt.CAL_DT
+where week_beg_dt in (DATE '2013-12-22', DATE '2012-06-23')
+group by week_beg_dt
+

http://git-wip-us.apache.org/repos/asf/kylin/blob/b4c970ad/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
----------------------------------------------------------------------
diff --git 
a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java 
b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
index 97efb27..8ecb808 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
@@ -56,6 +56,7 @@ import 
org.apache.calcite.sql.validate.SqlUserDefinedAggFunction;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.Util;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.measure.bitmap.BitmapMeasureType;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
@@ -79,6 +80,7 @@ public class OLAPAggregateRel extends Aggregate implements 
OLAPRel {
         AGGR_FUNC_MAP.put("$SUM0", "SUM");
         AGGR_FUNC_MAP.put("COUNT", "COUNT");
         AGGR_FUNC_MAP.put("COUNT_DISTINCT", "COUNT_DISTINCT");
+        AGGR_FUNC_MAP.put(BitmapMeasureType.FUNC_INTERSECT_COUNT_DISTINCT, 
"COUNT_DISTINCT");
         AGGR_FUNC_MAP.put("MAX", "MAX");
         AGGR_FUNC_MAP.put("MIN", "MIN");
 
@@ -224,6 +226,15 @@ public class OLAPAggregateRel extends Aggregate implements 
OLAPRel {
             Set<TblColRef> columns = 
inputColumnRowType.getSourceColumnsByIndex(i);
             this.groups.addAll(columns);
         }
+        // Some UDAF may group data by itself, add group key into groups, 
prevents aggregate at cube storage server side
+        for (AggregateCall aggCall : this.rewriteAggCalls) {
+            String aggregateName = aggCall.getAggregation().getName();
+            if 
(aggregateName.equalsIgnoreCase(BitmapMeasureType.FUNC_INTERSECT_COUNT_DISTINCT))
 {
+                int index = aggCall.getArgList().get(1);
+                TblColRef column = inputColumnRowType.getColumnByIndex(index);
+                groups.add(column);
+            }
+        }
     }
 
     private void buildAggregations() {
@@ -380,16 +391,17 @@ public class OLAPAggregateRel extends Aggregate 
implements OLAPRel {
         }
 
         // rebuild function
+        String callName = aggCall.getAggregation().getName();
         RelDataType fieldType = aggCall.getType();
         SqlAggFunction newAgg = aggCall.getAggregation();
         if (func.isCount()) {
             newAgg = SqlStdOperatorTable.SUM0;
         } else if (func.getMeasureType().getRewriteCalciteAggrFunctionClass() 
!= null) {
-            newAgg = createCustomAggFunction(func.getExpression(), fieldType, 
func.getMeasureType().getRewriteCalciteAggrFunctionClass());
+            newAgg = createCustomAggFunction(callName, fieldType, 
func.getMeasureType().getRewriteCalciteAggrFunctionClass(callName));
         }
 
         // rebuild aggregate call
-        AggregateCall newAggCall = new AggregateCall(newAgg, false, 
newArgList, fieldType, newAgg.getName());
+        AggregateCall newAggCall = new AggregateCall(newAgg, false, 
newArgList, fieldType, callName);
         return newAggCall;
     }
 

Reply via email to