Repository: kylin
Updated Branches:
  refs/heads/master 870322003 -> 4adea1677


http://git-wip-us.apache.org/repos/asf/kylin/blob/4adea167/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
----------------------------------------------------------------------
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java 
b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
index 4168df2..09fcf4b 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
@@ -96,7 +96,7 @@ public class ITKylinQueryTest extends KylinTestBase {
     @Test
     public void testSingleRunQuery() throws Exception {
 
-        String queryFileName = 
"src/test/resources/query/sql_tableau/query20.sql";
+        String queryFileName = "src/test/resources/query/temp/query01.sql";
 
         File sqlFile = new File(queryFileName);
         if (sqlFile.exists()) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/4adea167/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java
----------------------------------------------------------------------
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java 
b/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java
new file mode 100644
index 0000000..a92e298
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.query;
+
+import java.io.File;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.dbunit.Assertion;
+import org.dbunit.database.DatabaseConfig;
+import org.dbunit.database.DatabaseConnection;
+import org.dbunit.database.IDatabaseConnection;
+import org.dbunit.dataset.ITable;
+import org.dbunit.ext.h2.H2Connection;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+/**
+ */
+public class ITMassInQueryTest extends KylinTestBase {
+
+    FileSystem fileSystem;
+    Set<Long> vipSellers;
+
+    @BeforeClass
+    public static void setUp() throws SQLException {
+    }
+
+    @AfterClass
+    public static void tearDown() {
+    }
+
+    @Before
+    public void setup() throws Exception {
+
+        ITKylinQueryTest.clean();
+        ITKylinQueryTest.joinType = "inner";
+        ITKylinQueryTest.setupAll();
+
+        Configuration hconf = HadoopUtil.getCurrentConfiguration();
+        fileSystem = FileSystem.get(hconf);
+
+        int sellerCount = 200;
+        Random r = new Random();
+        vipSellers = Sets.newHashSet();
+        for (int i = 0; i < sellerCount; i++) {
+            vipSellers.add(10000000L + r.nextInt(1500));
+        }
+
+        Path path = new Path("/tmp/vip_customers.txt");
+        fileSystem.delete(path, false);
+        FSDataOutputStream outputStream = fileSystem.create(path);
+        org.apache.commons.io.IOUtils.write(StringUtils.join(vipSellers, 
"\n"), outputStream);
+        outputStream.close();
+
+        System.out.println("The filter is " + vipSellers);
+    }
+
+    @After
+    public void after() throws Exception {
+        ITKylinQueryTest.clean();
+
+    }
+
+    @Test
+    public void testMassInQuery() throws Exception {
+        compare("src/test/resources/query/sql_massin", null, true);
+    }
+
+    protected void compare(String queryFolder, String[] exclusiveQuerys, 
boolean needSort) throws Exception {
+        printInfo("---------- test folder: " + queryFolder);
+        Set<String> exclusiveSet = buildExclusiveSet(exclusiveQuerys);
+
+        List<File> sqlFiles = getFilesFromFolder(new File(queryFolder), 
".sql");
+        for (File sqlFile : sqlFiles) {
+            String queryName = StringUtils.split(sqlFile.getName(), '.')[0];
+            if (exclusiveSet.contains(queryName)) {
+                continue;
+            }
+            String sql = getTextFromFile(sqlFile);
+
+            // execute Kylin
+            printInfo("Query Result from Kylin - " + queryName + "  (" + 
queryFolder + ")");
+            IDatabaseConnection kylinConn = new 
DatabaseConnection(cubeConnection);
+            ITable kylinTable = executeQuery(kylinConn, queryName, sql, 
needSort);
+
+            // execute H2
+            sql = 
sql.replace("massin(test_kylin_fact.SELLER_ID,'vip_customers')", 
"test_kylin_fact.SELLER_ID in ( " + 
org.apache.commons.lang.StringUtils.join(vipSellers, ",") + ")");
+            printInfo("Query Result from H2 - " + queryName);
+            printInfo("Query for H2 - " + sql);
+            H2Connection h2Conn = new H2Connection(h2Connection, null);
+            
h2Conn.getConfig().setProperty(DatabaseConfig.PROPERTY_DATATYPE_FACTORY, new 
TestH2DataTypeFactory());
+            ITable h2Table = executeQuery(h2Conn, queryName, sql, needSort);
+
+            try {
+                // compare the result
+                Assertion.assertEquals(h2Table, kylinTable);
+            } catch (Throwable t) {
+                printInfo("execAndCompQuery failed on: " + 
sqlFile.getAbsolutePath());
+                throw t;
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/4adea167/kylin-it/src/test/resources/query/sql/query98.sql
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/resources/query/sql/query98.sql 
b/kylin-it/src/test/resources/query/sql/query98.sql
new file mode 100644
index 0000000..d73a4cb
--- /dev/null
+++ b/kylin-it/src/test/resources/query/sql/query98.sql
@@ -0,0 +1,21 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+select cal_dt, sum(price) as x,leaf_categ_id as GMV 
+ from test_kylin_fact 
+ group by leaf_categ_id, cal_dt

http://git-wip-us.apache.org/repos/asf/kylin/blob/4adea167/kylin-it/src/test/resources/query/sql_massin/query01.sql
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/resources/query/sql_massin/query01.sql 
b/kylin-it/src/test/resources/query/sql_massin/query01.sql
new file mode 100644
index 0000000..f21e5fb
--- /dev/null
+++ b/kylin-it/src/test/resources/query/sql_massin/query01.sql
@@ -0,0 +1,30 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+SELECT 
+  count(*) as TRANS_CNT 
+ FROM test_kylin_fact 
+ inner JOIN edw.test_cal_dt as test_cal_dt 
+ ON test_kylin_fact.cal_dt = test_cal_dt.cal_dt 
+ inner JOIN test_category_groupings 
+ ON test_kylin_fact.leaf_categ_id = test_category_groupings.leaf_categ_id AND 
test_kylin_fact.lstg_site_id = test_category_groupings.site_id 
+ inner JOIN edw.test_sites as test_sites 
+ ON test_kylin_fact.lstg_site_id = test_sites.site_id 
+ inner JOIN edw.test_seller_type_dim as test_seller_type_dim 
+ ON test_kylin_fact.slr_segment_cd = test_seller_type_dim.seller_type_cd 
+ where massin(test_kylin_fact.SELLER_ID,'vip_customers') 

http://git-wip-us.apache.org/repos/asf/kylin/blob/4adea167/kylin-it/src/test/resources/query/sql_massin/query02.sql
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/resources/query/sql_massin/query02.sql 
b/kylin-it/src/test/resources/query/sql_massin/query02.sql
new file mode 100644
index 0000000..8981b75
--- /dev/null
+++ b/kylin-it/src/test/resources/query/sql_massin/query02.sql
@@ -0,0 +1,30 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+
+ select test_kylin_fact.leaf_categ_id,sum(price) as GMV, count(*) as x
+ from test_kylin_fact inner JOIN edw.test_cal_dt as test_cal_dt 
+ ON test_kylin_fact.cal_dt = test_cal_dt.cal_dt 
+ inner JOIN test_category_groupings 
+ ON test_kylin_fact.leaf_categ_id = test_category_groupings.leaf_categ_id AND 
test_kylin_fact.lstg_site_id = test_category_groupings.site_id 
+ inner JOIN edw.test_sites as test_sites 
+ ON test_kylin_fact.lstg_site_id = test_sites.site_id 
+ inner JOIN edw.test_seller_type_dim as test_seller_type_dim 
+ ON test_kylin_fact.slr_segment_cd = test_seller_type_dim.seller_type_cd 
+  where massin(test_kylin_fact.SELLER_ID,'vip_customers') 
+ group by test_kylin_fact.leaf_categ_id

http://git-wip-us.apache.org/repos/asf/kylin/blob/4adea167/kylin-it/src/test/resources/query/sql_massin/query03.sql
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/resources/query/sql_massin/query03.sql 
b/kylin-it/src/test/resources/query/sql_massin/query03.sql
new file mode 100644
index 0000000..27a5918
--- /dev/null
+++ b/kylin-it/src/test/resources/query/sql_massin/query03.sql
@@ -0,0 +1,30 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+
+ select test_kylin_fact.lstg_format_name, sum(price) as GMV, count(*) as x
+ from test_kylin_fact inner JOIN edw.test_cal_dt as test_cal_dt 
+ ON test_kylin_fact.cal_dt = test_cal_dt.cal_dt 
+ inner JOIN test_category_groupings 
+ ON test_kylin_fact.leaf_categ_id = test_category_groupings.leaf_categ_id AND 
test_kylin_fact.lstg_site_id = test_category_groupings.site_id 
+ inner JOIN edw.test_sites as test_sites 
+ ON test_kylin_fact.lstg_site_id = test_sites.site_id 
+ inner JOIN edw.test_seller_type_dim as test_seller_type_dim 
+ ON test_kylin_fact.slr_segment_cd = test_seller_type_dim.seller_type_cd 
+ where massin(test_kylin_fact.SELLER_ID,'vip_customers') 
+ group by test_kylin_fact.lstg_format_name having count(*) > 2 

http://git-wip-us.apache.org/repos/asf/kylin/blob/4adea167/kylin-it/src/test/resources/query/sql_massin/query04.sql
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/resources/query/sql_massin/query04.sql 
b/kylin-it/src/test/resources/query/sql_massin/query04.sql
new file mode 100644
index 0000000..f11939c
--- /dev/null
+++ b/kylin-it/src/test/resources/query/sql_massin/query04.sql
@@ -0,0 +1,27 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+select sum(price) as GMV, count(*) as TRANS_CNT  FROM test_kylin_fact
+ inner JOIN edw.test_cal_dt as test_cal_dt
+ ON test_kylin_fact.cal_dt = test_cal_dt.cal_dt
+ inner JOIN test_category_groupings
+ ON test_kylin_fact.leaf_categ_id = test_category_groupings.leaf_categ_id
+ AND test_kylin_fact.lstg_site_id = test_category_groupings.site_id 
+where (test_kylin_fact.cal_dt < DATE '2012-05-01' or test_kylin_fact.cal_dt > 
DATE '2013-05-01')
+and massin(test_kylin_fact.SELLER_ID,'vip_customers') 
+ 

http://git-wip-us.apache.org/repos/asf/kylin/blob/4adea167/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java
----------------------------------------------------------------------
diff --git 
a/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java 
b/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java
index 5ea138f..2806b4b 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java
@@ -57,7 +57,7 @@ import org.apache.kylin.metadata.filter.CompareTupleFilter;
 import org.apache.kylin.metadata.filter.ConstantTupleFilter;
 import org.apache.kylin.metadata.filter.DynamicTupleFilter;
 import org.apache.kylin.metadata.filter.ExtractTupleFilter;
-import org.apache.kylin.metadata.filter.FunctionTupleFilter;
+import org.apache.kylin.metadata.filter.function.Functions;
 import org.apache.kylin.metadata.filter.LogicalTupleFilter;
 import org.apache.kylin.metadata.filter.TupleFilter;
 import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
@@ -130,12 +130,12 @@ public class OLAPFilterRel extends Filter implements 
OLAPRel {
                 if (op.getName().equalsIgnoreCase("extract_date")) {
                     filter = new 
ExtractTupleFilter(FilterOperatorEnum.EXTRACT);
                 } else {
-                    filter = new FunctionTupleFilter(op.getName());
+                    filter = Functions.getFunctionTupleFilter(op.getName());
                 }
                 break;
             case LIKE:
             case OTHER_FUNCTION:
-                filter = new FunctionTupleFilter(op.getName());
+                filter = Functions.getFunctionTupleFilter(op.getName());
                 break;
             default:
                 throw new UnsupportedOperationException(op.getName());

http://git-wip-us.apache.org/repos/asf/kylin/blob/4adea167/query/src/main/java/org/apache/kylin/query/schema/OLAPSchemaFactory.java
----------------------------------------------------------------------
diff --git 
a/query/src/main/java/org/apache/kylin/query/schema/OLAPSchemaFactory.java 
b/query/src/main/java/org/apache/kylin/query/schema/OLAPSchemaFactory.java
index f8711b1..44e9e47 100644
--- a/query/src/main/java/org/apache/kylin/query/schema/OLAPSchemaFactory.java
+++ b/query/src/main/java/org/apache/kylin/query/schema/OLAPSchemaFactory.java
@@ -29,15 +29,21 @@ import org.apache.calcite.schema.Schema;
 import org.apache.calcite.schema.SchemaFactory;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.util.ConversionUtil;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.metadata.model.DatabaseDesc;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.project.ProjectManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  */
 public class OLAPSchemaFactory implements SchemaFactory {
+    public static final Logger logger = 
LoggerFactory.getLogger(OLAPSchemaFactory.class);
+
 
     static {
         /*
@@ -107,7 +113,13 @@ public class OLAPSchemaFactory implements SchemaFactory {
                 out.write("            \"factory\": 
\"org.apache.kylin.query.schema.OLAPSchemaFactory\",\n");
                 out.write("            \"operand\": {\n");
                 out.write("                \"" + SCHEMA_PROJECT + "\": \"" + 
project + "\"\n");
-                out.write("            }\n");
+                out.write("            },\n");
+                out.write("            \"functions\": [\n");
+                out.write("               {\n");
+                out.write("                   name: 'MASSIN',\n");
+                out.write("                   className: 
'org.apache.kylin.query.udf.MassInUDF'\n");
+                out.write("               }\n");
+                out.write("              ]\n");
                 out.write("        }\n");
 
                 if (++counter != schemaCounts.size()) {
@@ -118,8 +130,10 @@ public class OLAPSchemaFactory implements SchemaFactory {
             out.write("    ]\n");
             out.write("}\n");
             out.close();
-
             tmp.deleteOnExit();
+
+            logger.info("Schema json:" + 
StringUtils.join(FileUtils.readLines(tmp), "\n"));
+
             return tmp;
 
         } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/4adea167/query/src/main/java/org/apache/kylin/query/udf/MassInUDF.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/udf/MassInUDF.java 
b/query/src/main/java/org/apache/kylin/query/udf/MassInUDF.java
new file mode 100644
index 0000000..596fa1e
--- /dev/null
+++ b/query/src/main/java/org/apache/kylin/query/udf/MassInUDF.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.query.udf;
+
+import org.apache.calcite.linq4j.function.Parameter;
+
+public class MassInUDF {
+
+    public boolean eval(@Parameter(name = "col") Object col, @Parameter(name = 
"filterTable") String filterTable) {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/4adea167/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
----------------------------------------------------------------------
diff --git 
a/server/src/main/java/org/apache/kylin/rest/service/CacheService.java 
b/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
index 9a6036d..4ec2116 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
@@ -200,6 +200,11 @@ public class CacheService extends BasicService {
                 IIDescManager.clearCache();
                 CubeDescManager.clearCache();
                 break;
+            case EXTERNAL_FILTER:
+                getMetadataManager().reloadTableCache(cacheKey);
+                IIDescManager.clearCache();
+                CubeDescManager.clearCache();
+                break;
             case DATA_MODEL:
                 getMetadataManager().reloadDataModelDesc(cacheKey);
                 IIDescManager.clearCache();
@@ -262,6 +267,8 @@ public class CacheService extends BasicService {
                 break;
             case TABLE:
                 throw new UnsupportedOperationException(log);
+            case EXTERNAL_FILTER:
+                throw new UnsupportedOperationException(log);
             case DATA_MODEL:
                 getMetadataManager().removeModelCache(cacheKey);
                 break;

http://git-wip-us.apache.org/repos/asf/kylin/blob/4adea167/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterDecorator.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterDecorator.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterDecorator.java
index a782020..f87bd30 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterDecorator.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterDecorator.java
@@ -24,7 +24,7 @@ import java.util.Set;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.cube.kv.RowKeyColumnIO;
 import org.apache.kylin.dict.DictCodeSystem;
-import org.apache.kylin.dict.TupleFilterFunctionTransformer;
+import org.apache.kylin.dict.BuildInFunctionTransformer;
 import org.apache.kylin.dimension.Dictionary;
 import org.apache.kylin.dimension.DimensionEncoding;
 import org.apache.kylin.dimension.IDimensionEncodingMap;
@@ -151,7 +151,7 @@ public class FilterDecorator implements 
TupleFilterSerializer.Decorator {
         if (filter == null)
             return null;
 
-        TupleFilterFunctionTransformer translator = new 
TupleFilterFunctionTransformer(dimEncMap);
+        BuildInFunctionTransformer translator = new 
BuildInFunctionTransformer(dimEncMap);
         filter = translator.transform(filter);
 
         // un-evaluatable filter is replaced with TRUE

http://git-wip-us.apache.org/repos/asf/kylin/blob/4adea167/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index e6f9ac1..bde2196 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -135,7 +135,12 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
             }
             try {
                 current++;
-                return queue.poll(timeout, TimeUnit.MILLISECONDS);
+                byte[] ret = queue.poll(timeout, TimeUnit.MILLISECONDS);
+                if (ret == null) {
+                    throw new RuntimeException("Timeout visiting cube!");
+                } else {
+                    return ret;
+                }
             } catch (InterruptedException e) {
                 throw new RuntimeException("error when waiting queue", e);
             }
@@ -236,7 +241,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
             return 
Lists.newArrayList(Pair.newPair(getByteArrayForShort(baseShard), 
getByteArrayForShort((short) (baseShard + shardNum - 1))));
         } else {
             //0,1,2,3,4 wants 4,0
-            return 
Lists.newArrayList(Pair.newPair(getByteArrayForShort(baseShard), 
getByteArrayForShort((short) (totalShards - 1))),//
+            return 
Lists.newArrayList(Pair.newPair(getByteArrayForShort(baseShard), 
getByteArrayForShort((short) (totalShards - 1))), //
                     Pair.newPair(getByteArrayForShort((short) 0), 
getByteArrayForShort((short) (baseShard + shardNum - totalShards - 1))));
         }
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4adea167/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
index c69d31f..ea38508 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
@@ -40,7 +40,8 @@ import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.gridtable.CubeGridTable;
 import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
 import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.dict.TupleFilterFunctionTransformer;
+import org.apache.kylin.dict.BuildInFunctionTransformer;
+import org.apache.kylin.dimension.DimensionEncoding;
 import org.apache.kylin.gridtable.EmptyGTScanner;
 import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.gridtable.GTRecord;
@@ -52,8 +53,10 @@ import org.apache.kylin.gridtable.IGTScanner;
 import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.filter.ITupleFilterTransformer;
 import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.filter.UDF.MassInTupleFilter;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.TblColRef;
+import 
org.apache.kylin.storage.hbase.cube.v2.filter.MassInValueProviderFactoryImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,7 +76,7 @@ public class CubeSegmentScanner implements IGTScanner {
     final Cuboid cuboid;
 
     public CubeSegmentScanner(CubeSegment cubeSeg, Cuboid cuboid, 
Set<TblColRef> dimensions, Set<TblColRef> groups, //
-            Collection<FunctionDesc> metrics, TupleFilter filter, boolean 
allowPreAggregate)  {
+            Collection<FunctionDesc> metrics, TupleFilter filter, boolean 
allowPreAggregate) {
         this.cuboid = cuboid;
         this.cubeSeg = cubeSeg;
         this.info = CubeGridTable.newGTInfo(cubeSeg, cuboid.getId());
@@ -81,7 +84,7 @@ public class CubeSegmentScanner implements IGTScanner {
         CuboidToGridTableMapping mapping = 
cuboid.getCuboidToGridTableMapping();
 
         // translate FunctionTupleFilter to IN clause
-        ITupleFilterTransformer translator = new 
TupleFilterFunctionTransformer(cubeSeg.getDimensionEncodingMap());
+        ITupleFilterTransformer translator = new 
BuildInFunctionTransformer(cubeSeg.getDimensionEncodingMap());
         filter = translator.transform(filter);
 
         //replace the constant values in filter to dictionary codes 
@@ -242,6 +245,12 @@ public class CubeSegmentScanner implements IGTScanner {
         public Scanner() {
             CubeHBaseRPC rpc;
             if 
("scan".equalsIgnoreCase(BackdoorToggles.getHbaseCubeQueryProtocol())) {
+                MassInTupleFilter.VALUE_PROVIDER_FACTORY = new 
MassInValueProviderFactoryImpl(new MassInValueProviderFactoryImpl.DimEncAware() 
{
+                    @Override
+                    public DimensionEncoding getDimEnc(TblColRef col) {
+                        return 
info.getCodeSystem().getDimEnc(col.getColumnDesc().getZeroBasedIndex());
+                    }
+                });
                 rpc = new CubeHBaseScanRPC(cubeSeg, cuboid, info);
             } else {
                 rpc = new CubeHBaseEndpointRPC(cubeSeg, cuboid, 
info);//default behavior

http://git-wip-us.apache.org/repos/asf/kylin/blob/4adea167/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index 7ac4be1..9f42c1c 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -40,10 +40,13 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.CompressionUtils;
 import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.dimension.DimensionEncoding;
 import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.gridtable.GTScanRequest;
 import org.apache.kylin.gridtable.IGTScanner;
 import org.apache.kylin.gridtable.IGTStore;
+import org.apache.kylin.metadata.filter.UDF.MassInTupleFilter;
+import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorBehavior;
 import org.apache.kylin.storage.hbase.cube.v2.CellListIterator;
 import org.apache.kylin.storage.hbase.cube.v2.CubeHBaseRPC;
@@ -51,6 +54,7 @@ import 
org.apache.kylin.storage.hbase.cube.v2.HBaseReadonlyStore;
 import org.apache.kylin.storage.hbase.cube.v2.RawScan;
 import 
org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos;
 import 
org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntList;
+import 
org.apache.kylin.storage.hbase.cube.v2.filter.MassInValueProviderFactoryImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -141,6 +145,7 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
 
     @Override
     public void visitCube(RpcController controller, 
CubeVisitProtos.CubeVisitRequest request, 
RpcCallback<CubeVisitProtos.CubeVisitResponse> done) {
+
         RegionScanner innerScanner = null;
         HRegion region = null;
 
@@ -153,8 +158,15 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
             region = env.getRegion();
             region.startRegionOperation();
 
-            GTScanRequest scanReq = 
GTScanRequest.serializer.deserialize(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getGtScanRequest())));
-            RawScan hbaseRawScan = 
RawScan.serializer.deserialize(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getHbaseRawScan())));
+            final GTScanRequest scanReq = 
GTScanRequest.serializer.deserialize(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getGtScanRequest())));
+            final RawScan hbaseRawScan = 
RawScan.serializer.deserialize(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getHbaseRawScan())));
+
+            MassInTupleFilter.VALUE_PROVIDER_FACTORY = new 
MassInValueProviderFactoryImpl(new MassInValueProviderFactoryImpl.DimEncAware() 
{
+                @Override
+                public DimensionEncoding getDimEnc(TblColRef col) {
+                    return 
scanReq.getInfo().getCodeSystem().getDimEnc(col.getColumnDesc().getZeroBasedIndex());
+                }
+            });
 
             List<List<Integer>> hbaseColumnsToGT = Lists.newArrayList();
             for (IntList intList : request.getHbaseColumnsToGTList()) {
@@ -165,7 +177,7 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
                 //if has shard, fill region shard to raw scan start/end
                 updateRawScanByCurrentRegion(hbaseRawScan, region, 
request.getRowkeyPreambleSize() - RowConstants.ROWKEY_CUBOIDID_LEN);
             }
-            
+
             Scan scan = CubeHBaseRPC.buildScan(hbaseRawScan);
 
             appendProfileInfo(sb);
@@ -181,8 +193,8 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
             IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, 
hbaseRawScan.hbaseColumns, hbaseColumnsToGT, request.getRowkeyPreambleSize());
             IGTScanner rawScanner = store.scan(scanReq);
 
-            IGTScanner finalScanner = scanReq.decorateScanner(rawScanner,//
-                    behavior.ordinal() >= 
CoprocessorBehavior.SCAN_FILTER.ordinal(),//
+            IGTScanner finalScanner = scanReq.decorateScanner(rawScanner, //
+                    behavior.ordinal() >= 
CoprocessorBehavior.SCAN_FILTER.ordinal(), //
                     behavior.ordinal() >= 
CoprocessorBehavior.SCAN_FILTER_AGGR.ordinal());
 
             ByteBuffer buffer = 
ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
@@ -226,7 +238,8 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
                             setFreeSwapSpaceSize(freeSwapSpaceSize).//
                             
setHostname(InetAddress.getLocalHost().getHostName()).// 
                             setEtcMsg(sb.toString()).//
-                            build()).//
+                            build())
+                    .//
                     build());
 
         } catch (IOException ioe) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/4adea167/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/filter/MassInValueProviderFactoryImpl.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/filter/MassInValueProviderFactoryImpl.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/filter/MassInValueProviderFactoryImpl.java
new file mode 100644
index 0000000..fe6276f
--- /dev/null
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/filter/MassInValueProviderFactoryImpl.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.hbase.cube.v2.filter;
+
+import org.apache.kylin.dimension.DimensionEncoding;
+import org.apache.kylin.metadata.filter.UDF.MassInValueProvider;
+import org.apache.kylin.metadata.filter.UDF.MassInValueProviderFactory;
+import org.apache.kylin.metadata.filter.function.Functions;
+import org.apache.kylin.metadata.model.TblColRef;
+
+public class MassInValueProviderFactoryImpl implements 
MassInValueProviderFactory {
+
+    public interface DimEncAware {
+        DimensionEncoding getDimEnc(TblColRef col);
+    }
+
+    private DimEncAware dimEncAware = null;
+
+    public MassInValueProviderFactoryImpl(DimEncAware dimEncAware) {
+        this.dimEncAware = dimEncAware;
+    }
+
+    @Override
+    public MassInValueProvider getProvider(Functions.FilterTableType 
filterTableType, String filterResourceIdentifier, TblColRef col) {
+        return new MassInValueProviderImpl(filterTableType, 
filterResourceIdentifier, dimEncAware.getDimEnc(col));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/4adea167/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/filter/MassInValueProviderImpl.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/filter/MassInValueProviderImpl.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/filter/MassInValueProviderImpl.java
new file mode 100644
index 0000000..83a6671
--- /dev/null
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/filter/MassInValueProviderImpl.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.hbase.cube.v2.filter;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.dimension.DimensionEncoding;
+import org.apache.kylin.metadata.filter.UDF.MassInValueProvider;
+import org.apache.kylin.metadata.filter.function.Functions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Sets;
+
+public class MassInValueProviderImpl implements MassInValueProvider {
+    public static final Logger logger = 
LoggerFactory.getLogger(MassInValueProviderImpl.class);
+
+    private Set<ByteArray> ret = Sets.newHashSet();
+
+    public MassInValueProviderImpl(Functions.FilterTableType filterTableType, 
String filterResourceIdentifier, DimensionEncoding encoding) {
+
+        if (filterTableType == Functions.FilterTableType.HDFS) {
+
+            logger.info("Start to load HDFS filter table from " + 
filterResourceIdentifier);
+            Stopwatch stopwatch = new Stopwatch().start();
+
+            FileSystem fileSystem;
+            try {
+                fileSystem = FileSystem.get(HBaseConfiguration.create());
+                InputStream inputStream = fileSystem.open(new 
Path(filterResourceIdentifier));
+                List<String> lines = IOUtils.readLines(inputStream);
+
+                logger.info("Load HDFS finished after " + 
stopwatch.elapsedMillis() + " millis");
+
+                for (String line : lines) {
+                    ByteArray byteArray = 
ByteArray.allocate(encoding.getLengthOfEncoding());
+                    encoding.encode(line.getBytes(), line.getBytes().length, 
byteArray.array(), 0);
+                    ret.add(byteArray);
+                }
+
+                logger.info("Mass In values constructed after " + 
stopwatch.elapsedMillis() + " millis, containing " + ret.size() + " entries");
+
+            } catch (IOException e) {
+                throw new RuntimeException("error when loading the mass in 
values", e);
+            }
+        } else {
+            throw new RuntimeException("HBASE_TABLE FilterTableType Not 
supported yet");
+        }
+    }
+
+    @Override
+    public Set<?> getMassInValues() {
+        return ret;
+    }
+}

Reply via email to