Repository: kylin Updated Branches: refs/heads/master 870322003 -> 4adea1677
http://git-wip-us.apache.org/repos/asf/kylin/blob/4adea167/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java index 4168df2..09fcf4b 100644 --- a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java @@ -96,7 +96,7 @@ public class ITKylinQueryTest extends KylinTestBase { @Test public void testSingleRunQuery() throws Exception { - String queryFileName = "src/test/resources/query/sql_tableau/query20.sql"; + String queryFileName = "src/test/resources/query/temp/query01.sql"; File sqlFile = new File(queryFileName); if (sqlFile.exists()) { http://git-wip-us.apache.org/repos/asf/kylin/blob/4adea167/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java new file mode 100644 index 0000000..a92e298 --- /dev/null +++ b/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.query; + +import java.io.File; +import java.sql.SQLException; +import java.util.List; +import java.util.Random; +import java.util.Set; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.kylin.engine.mr.HadoopUtil; +import org.dbunit.Assertion; +import org.dbunit.database.DatabaseConfig; +import org.dbunit.database.DatabaseConnection; +import org.dbunit.database.IDatabaseConnection; +import org.dbunit.dataset.ITable; +import org.dbunit.ext.h2.H2Connection; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.collect.Sets; + +/** + */ +public class ITMassInQueryTest extends KylinTestBase { + + FileSystem fileSystem; + Set<Long> vipSellers; + + @BeforeClass + public static void setUp() throws SQLException { + } + + @AfterClass + public static void tearDown() { + } + + @Before + public void setup() throws Exception { + + ITKylinQueryTest.clean(); + ITKylinQueryTest.joinType = "inner"; + ITKylinQueryTest.setupAll(); + + Configuration hconf = HadoopUtil.getCurrentConfiguration(); + fileSystem = FileSystem.get(hconf); + + int sellerCount = 200; + Random r = new Random(); + vipSellers = Sets.newHashSet(); + for (int i = 0; i < sellerCount; i++) { + vipSellers.add(10000000L + r.nextInt(1500)); + } + + Path path = new Path("/tmp/vip_customers.txt"); + fileSystem.delete(path, false); + FSDataOutputStream outputStream = fileSystem.create(path); + org.apache.commons.io.IOUtils.write(StringUtils.join(vipSellers, "\n"), outputStream); + outputStream.close(); + + System.out.println("The filter is " + vipSellers); + } + + @After + public void after() throws Exception { + ITKylinQueryTest.clean(); + + } + + @Test + public void testMassInQuery() throws Exception { + compare("src/test/resources/query/sql_massin", null, true); + } + + protected void compare(String queryFolder, String[] exclusiveQuerys, boolean needSort) throws Exception { + printInfo("---------- test folder: " + queryFolder); + Set<String> exclusiveSet = buildExclusiveSet(exclusiveQuerys); + + List<File> sqlFiles = getFilesFromFolder(new File(queryFolder), ".sql"); + for (File sqlFile : sqlFiles) { + String queryName = StringUtils.split(sqlFile.getName(), '.')[0]; + if (exclusiveSet.contains(queryName)) { + continue; + } + String sql = getTextFromFile(sqlFile); + + // execute Kylin + printInfo("Query Result from Kylin - " + queryName + " (" + queryFolder + ")"); + IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection); + ITable kylinTable = executeQuery(kylinConn, queryName, sql, needSort); + + // execute H2 + sql = sql.replace("massin(test_kylin_fact.SELLER_ID,'vip_customers')", "test_kylin_fact.SELLER_ID in ( " + org.apache.commons.lang.StringUtils.join(vipSellers, ",") + ")"); + printInfo("Query Result from H2 - " + queryName); + printInfo("Query for H2 - " + sql); + H2Connection h2Conn = new H2Connection(h2Connection, null); + h2Conn.getConfig().setProperty(DatabaseConfig.PROPERTY_DATATYPE_FACTORY, new TestH2DataTypeFactory()); + ITable h2Table = executeQuery(h2Conn, queryName, sql, needSort); + + try { + // compare the result + Assertion.assertEquals(h2Table, kylinTable); + } catch (Throwable t) { + printInfo("execAndCompQuery failed on: " + sqlFile.getAbsolutePath()); + throw t; + } + } + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/4adea167/kylin-it/src/test/resources/query/sql/query98.sql ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/resources/query/sql/query98.sql b/kylin-it/src/test/resources/query/sql/query98.sql new file mode 100644 index 0000000..d73a4cb --- /dev/null +++ b/kylin-it/src/test/resources/query/sql/query98.sql @@ -0,0 +1,21 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +select cal_dt, sum(price) as x,leaf_categ_id as GMV + from test_kylin_fact + group by leaf_categ_id, cal_dt http://git-wip-us.apache.org/repos/asf/kylin/blob/4adea167/kylin-it/src/test/resources/query/sql_massin/query01.sql ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/resources/query/sql_massin/query01.sql b/kylin-it/src/test/resources/query/sql_massin/query01.sql new file mode 100644 index 0000000..f21e5fb --- /dev/null +++ b/kylin-it/src/test/resources/query/sql_massin/query01.sql @@ -0,0 +1,30 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +SELECT + count(*) as TRANS_CNT + FROM test_kylin_fact + inner JOIN edw.test_cal_dt as test_cal_dt + ON test_kylin_fact.cal_dt = test_cal_dt.cal_dt + inner JOIN test_category_groupings + ON test_kylin_fact.leaf_categ_id = test_category_groupings.leaf_categ_id AND test_kylin_fact.lstg_site_id = test_category_groupings.site_id + inner JOIN edw.test_sites as test_sites + ON test_kylin_fact.lstg_site_id = test_sites.site_id + inner JOIN edw.test_seller_type_dim as test_seller_type_dim + ON test_kylin_fact.slr_segment_cd = test_seller_type_dim.seller_type_cd + where massin(test_kylin_fact.SELLER_ID,'vip_customers') http://git-wip-us.apache.org/repos/asf/kylin/blob/4adea167/kylin-it/src/test/resources/query/sql_massin/query02.sql ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/resources/query/sql_massin/query02.sql b/kylin-it/src/test/resources/query/sql_massin/query02.sql new file mode 100644 index 0000000..8981b75 --- /dev/null +++ b/kylin-it/src/test/resources/query/sql_massin/query02.sql @@ -0,0 +1,30 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + + + select test_kylin_fact.leaf_categ_id,sum(price) as GMV, count(*) as x + from test_kylin_fact inner JOIN edw.test_cal_dt as test_cal_dt + ON test_kylin_fact.cal_dt = test_cal_dt.cal_dt + inner JOIN test_category_groupings + ON test_kylin_fact.leaf_categ_id = test_category_groupings.leaf_categ_id AND test_kylin_fact.lstg_site_id = test_category_groupings.site_id + inner JOIN edw.test_sites as test_sites + ON test_kylin_fact.lstg_site_id = test_sites.site_id + inner JOIN edw.test_seller_type_dim as test_seller_type_dim + ON test_kylin_fact.slr_segment_cd = test_seller_type_dim.seller_type_cd + where massin(test_kylin_fact.SELLER_ID,'vip_customers') + group by test_kylin_fact.leaf_categ_id http://git-wip-us.apache.org/repos/asf/kylin/blob/4adea167/kylin-it/src/test/resources/query/sql_massin/query03.sql ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/resources/query/sql_massin/query03.sql b/kylin-it/src/test/resources/query/sql_massin/query03.sql new file mode 100644 index 0000000..27a5918 --- /dev/null +++ b/kylin-it/src/test/resources/query/sql_massin/query03.sql @@ -0,0 +1,30 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + + + select test_kylin_fact.lstg_format_name, sum(price) as GMV, count(*) as x + from test_kylin_fact inner JOIN edw.test_cal_dt as test_cal_dt + ON test_kylin_fact.cal_dt = test_cal_dt.cal_dt + inner JOIN test_category_groupings + ON test_kylin_fact.leaf_categ_id = test_category_groupings.leaf_categ_id AND test_kylin_fact.lstg_site_id = test_category_groupings.site_id + inner JOIN edw.test_sites as test_sites + ON test_kylin_fact.lstg_site_id = test_sites.site_id + inner JOIN edw.test_seller_type_dim as test_seller_type_dim + ON test_kylin_fact.slr_segment_cd = test_seller_type_dim.seller_type_cd + where massin(test_kylin_fact.SELLER_ID,'vip_customers') + group by test_kylin_fact.lstg_format_name having count(*) > 2 http://git-wip-us.apache.org/repos/asf/kylin/blob/4adea167/kylin-it/src/test/resources/query/sql_massin/query04.sql ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/resources/query/sql_massin/query04.sql b/kylin-it/src/test/resources/query/sql_massin/query04.sql new file mode 100644 index 0000000..f11939c --- /dev/null +++ b/kylin-it/src/test/resources/query/sql_massin/query04.sql @@ -0,0 +1,27 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +select sum(price) as GMV, count(*) as TRANS_CNT FROM test_kylin_fact + inner JOIN edw.test_cal_dt as test_cal_dt + ON test_kylin_fact.cal_dt = test_cal_dt.cal_dt + inner JOIN test_category_groupings + ON test_kylin_fact.leaf_categ_id = test_category_groupings.leaf_categ_id + AND test_kylin_fact.lstg_site_id = test_category_groupings.site_id +where (test_kylin_fact.cal_dt < DATE '2012-05-01' or test_kylin_fact.cal_dt > DATE '2013-05-01') +and massin(test_kylin_fact.SELLER_ID,'vip_customers') + http://git-wip-us.apache.org/repos/asf/kylin/blob/4adea167/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java index 5ea138f..2806b4b 100644 --- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java +++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java @@ -57,7 +57,7 @@ import org.apache.kylin.metadata.filter.CompareTupleFilter; import org.apache.kylin.metadata.filter.ConstantTupleFilter; import org.apache.kylin.metadata.filter.DynamicTupleFilter; import org.apache.kylin.metadata.filter.ExtractTupleFilter; -import org.apache.kylin.metadata.filter.FunctionTupleFilter; +import org.apache.kylin.metadata.filter.function.Functions; import org.apache.kylin.metadata.filter.LogicalTupleFilter; import org.apache.kylin.metadata.filter.TupleFilter; import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum; @@ -130,12 +130,12 @@ public class OLAPFilterRel extends Filter implements OLAPRel { if (op.getName().equalsIgnoreCase("extract_date")) { filter = new ExtractTupleFilter(FilterOperatorEnum.EXTRACT); } else { - filter = new FunctionTupleFilter(op.getName()); + filter = Functions.getFunctionTupleFilter(op.getName()); } break; case LIKE: case OTHER_FUNCTION: - filter = new FunctionTupleFilter(op.getName()); + filter = Functions.getFunctionTupleFilter(op.getName()); break; default: throw new UnsupportedOperationException(op.getName()); http://git-wip-us.apache.org/repos/asf/kylin/blob/4adea167/query/src/main/java/org/apache/kylin/query/schema/OLAPSchemaFactory.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/schema/OLAPSchemaFactory.java b/query/src/main/java/org/apache/kylin/query/schema/OLAPSchemaFactory.java index f8711b1..44e9e47 100644 --- a/query/src/main/java/org/apache/kylin/query/schema/OLAPSchemaFactory.java +++ b/query/src/main/java/org/apache/kylin/query/schema/OLAPSchemaFactory.java @@ -29,15 +29,21 @@ import org.apache.calcite.schema.Schema; import org.apache.calcite.schema.SchemaFactory; import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.util.ConversionUtil; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.metadata.model.DatabaseDesc; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.metadata.project.ProjectManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** */ public class OLAPSchemaFactory implements SchemaFactory { + public static final Logger logger = LoggerFactory.getLogger(OLAPSchemaFactory.class); + static { /* @@ -107,7 +113,13 @@ public class OLAPSchemaFactory implements SchemaFactory { out.write(" \"factory\": \"org.apache.kylin.query.schema.OLAPSchemaFactory\",\n"); out.write(" \"operand\": {\n"); out.write(" \"" + SCHEMA_PROJECT + "\": \"" + project + "\"\n"); - out.write(" }\n"); + out.write(" },\n"); + out.write(" \"functions\": [\n"); + out.write(" {\n"); + out.write(" name: 'MASSIN',\n"); + out.write(" className: 'org.apache.kylin.query.udf.MassInUDF'\n"); + out.write(" }\n"); + out.write(" ]\n"); out.write(" }\n"); if (++counter != schemaCounts.size()) { @@ -118,8 +130,10 @@ public class OLAPSchemaFactory implements SchemaFactory { out.write(" ]\n"); out.write("}\n"); out.close(); - tmp.deleteOnExit(); + + logger.info("Schema json:" + StringUtils.join(FileUtils.readLines(tmp), "\n")); + return tmp; } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/kylin/blob/4adea167/query/src/main/java/org/apache/kylin/query/udf/MassInUDF.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/udf/MassInUDF.java b/query/src/main/java/org/apache/kylin/query/udf/MassInUDF.java new file mode 100644 index 0000000..596fa1e --- /dev/null +++ b/query/src/main/java/org/apache/kylin/query/udf/MassInUDF.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.query.udf; + +import org.apache.calcite.linq4j.function.Parameter; + +public class MassInUDF { + + public boolean eval(@Parameter(name = "col") Object col, @Parameter(name = "filterTable") String filterTable) { + return true; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/4adea167/server/src/main/java/org/apache/kylin/rest/service/CacheService.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/service/CacheService.java b/server/src/main/java/org/apache/kylin/rest/service/CacheService.java index 9a6036d..4ec2116 100644 --- a/server/src/main/java/org/apache/kylin/rest/service/CacheService.java +++ b/server/src/main/java/org/apache/kylin/rest/service/CacheService.java @@ -200,6 +200,11 @@ public class CacheService extends BasicService { IIDescManager.clearCache(); CubeDescManager.clearCache(); break; + case EXTERNAL_FILTER: + getMetadataManager().reloadTableCache(cacheKey); + IIDescManager.clearCache(); + CubeDescManager.clearCache(); + break; case DATA_MODEL: getMetadataManager().reloadDataModelDesc(cacheKey); IIDescManager.clearCache(); @@ -262,6 +267,8 @@ public class CacheService extends BasicService { break; case TABLE: throw new UnsupportedOperationException(log); + case EXTERNAL_FILTER: + throw new UnsupportedOperationException(log); case DATA_MODEL: getMetadataManager().removeModelCache(cacheKey); break; http://git-wip-us.apache.org/repos/asf/kylin/blob/4adea167/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterDecorator.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterDecorator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterDecorator.java index a782020..f87bd30 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterDecorator.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterDecorator.java @@ -24,7 +24,7 @@ import java.util.Set; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.cube.kv.RowKeyColumnIO; import org.apache.kylin.dict.DictCodeSystem; -import org.apache.kylin.dict.TupleFilterFunctionTransformer; +import org.apache.kylin.dict.BuildInFunctionTransformer; import org.apache.kylin.dimension.Dictionary; import org.apache.kylin.dimension.DimensionEncoding; import org.apache.kylin.dimension.IDimensionEncodingMap; @@ -151,7 +151,7 @@ public class FilterDecorator implements TupleFilterSerializer.Decorator { if (filter == null) return null; - TupleFilterFunctionTransformer translator = new TupleFilterFunctionTransformer(dimEncMap); + BuildInFunctionTransformer translator = new BuildInFunctionTransformer(dimEncMap); filter = translator.transform(filter); // un-evaluatable filter is replaced with TRUE http://git-wip-us.apache.org/repos/asf/kylin/blob/4adea167/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java index e6f9ac1..bde2196 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java @@ -135,7 +135,12 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { } try { current++; - return queue.poll(timeout, TimeUnit.MILLISECONDS); + byte[] ret = queue.poll(timeout, TimeUnit.MILLISECONDS); + if (ret == null) { + throw new RuntimeException("Timeout visiting cube!"); + } else { + return ret; + } } catch (InterruptedException e) { throw new RuntimeException("error when waiting queue", e); } @@ -236,7 +241,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { return Lists.newArrayList(Pair.newPair(getByteArrayForShort(baseShard), getByteArrayForShort((short) (baseShard + shardNum - 1)))); } else { //0,1,2,3,4 wants 4,0 - return Lists.newArrayList(Pair.newPair(getByteArrayForShort(baseShard), getByteArrayForShort((short) (totalShards - 1))),// + return Lists.newArrayList(Pair.newPair(getByteArrayForShort(baseShard), getByteArrayForShort((short) (totalShards - 1))), // Pair.newPair(getByteArrayForShort((short) 0), getByteArrayForShort((short) (baseShard + shardNum - totalShards - 1)))); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/4adea167/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java index c69d31f..ea38508 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java @@ -40,7 +40,8 @@ import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.gridtable.CubeGridTable; import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping; import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.dict.TupleFilterFunctionTransformer; +import org.apache.kylin.dict.BuildInFunctionTransformer; +import org.apache.kylin.dimension.DimensionEncoding; import org.apache.kylin.gridtable.EmptyGTScanner; import org.apache.kylin.gridtable.GTInfo; import org.apache.kylin.gridtable.GTRecord; @@ -52,8 +53,10 @@ import org.apache.kylin.gridtable.IGTScanner; import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.metadata.filter.ITupleFilterTransformer; import org.apache.kylin.metadata.filter.TupleFilter; +import org.apache.kylin.metadata.filter.UDF.MassInTupleFilter; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.storage.hbase.cube.v2.filter.MassInValueProviderFactoryImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,7 +76,7 @@ public class CubeSegmentScanner implements IGTScanner { final Cuboid cuboid; public CubeSegmentScanner(CubeSegment cubeSeg, Cuboid cuboid, Set<TblColRef> dimensions, Set<TblColRef> groups, // - Collection<FunctionDesc> metrics, TupleFilter filter, boolean allowPreAggregate) { + Collection<FunctionDesc> metrics, TupleFilter filter, boolean allowPreAggregate) { this.cuboid = cuboid; this.cubeSeg = cubeSeg; this.info = CubeGridTable.newGTInfo(cubeSeg, cuboid.getId()); @@ -81,7 +84,7 @@ public class CubeSegmentScanner implements IGTScanner { CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping(); // translate FunctionTupleFilter to IN clause - ITupleFilterTransformer translator = new TupleFilterFunctionTransformer(cubeSeg.getDimensionEncodingMap()); + ITupleFilterTransformer translator = new BuildInFunctionTransformer(cubeSeg.getDimensionEncodingMap()); filter = translator.transform(filter); //replace the constant values in filter to dictionary codes @@ -242,6 +245,12 @@ public class CubeSegmentScanner implements IGTScanner { public Scanner() { CubeHBaseRPC rpc; if ("scan".equalsIgnoreCase(BackdoorToggles.getHbaseCubeQueryProtocol())) { + MassInTupleFilter.VALUE_PROVIDER_FACTORY = new MassInValueProviderFactoryImpl(new MassInValueProviderFactoryImpl.DimEncAware() { + @Override + public DimensionEncoding getDimEnc(TblColRef col) { + return info.getCodeSystem().getDimEnc(col.getColumnDesc().getZeroBasedIndex()); + } + }); rpc = new CubeHBaseScanRPC(cubeSeg, cuboid, info); } else { rpc = new CubeHBaseEndpointRPC(cubeSeg, cuboid, info);//default behavior http://git-wip-us.apache.org/repos/asf/kylin/blob/4adea167/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java index 7ac4be1..9f42c1c 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java @@ -40,10 +40,13 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.CompressionUtils; import org.apache.kylin.cube.kv.RowConstants; +import org.apache.kylin.dimension.DimensionEncoding; import org.apache.kylin.gridtable.GTRecord; import org.apache.kylin.gridtable.GTScanRequest; import org.apache.kylin.gridtable.IGTScanner; import org.apache.kylin.gridtable.IGTStore; +import org.apache.kylin.metadata.filter.UDF.MassInTupleFilter; +import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorBehavior; import org.apache.kylin.storage.hbase.cube.v2.CellListIterator; import org.apache.kylin.storage.hbase.cube.v2.CubeHBaseRPC; @@ -51,6 +54,7 @@ import org.apache.kylin.storage.hbase.cube.v2.HBaseReadonlyStore; import org.apache.kylin.storage.hbase.cube.v2.RawScan; import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos; import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntList; +import org.apache.kylin.storage.hbase.cube.v2.filter.MassInValueProviderFactoryImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -141,6 +145,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement @Override public void visitCube(RpcController controller, CubeVisitProtos.CubeVisitRequest request, RpcCallback<CubeVisitProtos.CubeVisitResponse> done) { + RegionScanner innerScanner = null; HRegion region = null; @@ -153,8 +158,15 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement region = env.getRegion(); region.startRegionOperation(); - GTScanRequest scanReq = GTScanRequest.serializer.deserialize(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getGtScanRequest()))); - RawScan hbaseRawScan = RawScan.serializer.deserialize(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getHbaseRawScan()))); + final GTScanRequest scanReq = GTScanRequest.serializer.deserialize(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getGtScanRequest()))); + final RawScan hbaseRawScan = RawScan.serializer.deserialize(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getHbaseRawScan()))); + + MassInTupleFilter.VALUE_PROVIDER_FACTORY = new MassInValueProviderFactoryImpl(new MassInValueProviderFactoryImpl.DimEncAware() { + @Override + public DimensionEncoding getDimEnc(TblColRef col) { + return scanReq.getInfo().getCodeSystem().getDimEnc(col.getColumnDesc().getZeroBasedIndex()); + } + }); List<List<Integer>> hbaseColumnsToGT = Lists.newArrayList(); for (IntList intList : request.getHbaseColumnsToGTList()) { @@ -165,7 +177,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement //if has shard, fill region shard to raw scan start/end updateRawScanByCurrentRegion(hbaseRawScan, region, request.getRowkeyPreambleSize() - RowConstants.ROWKEY_CUBOIDID_LEN); } - + Scan scan = CubeHBaseRPC.buildScan(hbaseRawScan); appendProfileInfo(sb); @@ -181,8 +193,8 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, hbaseRawScan.hbaseColumns, hbaseColumnsToGT, request.getRowkeyPreambleSize()); IGTScanner rawScanner = store.scan(scanReq); - IGTScanner finalScanner = scanReq.decorateScanner(rawScanner,// - behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER.ordinal(),// + IGTScanner finalScanner = scanReq.decorateScanner(rawScanner, // + behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER.ordinal(), // behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER_AGGR.ordinal()); ByteBuffer buffer = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE); @@ -226,7 +238,8 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement setFreeSwapSpaceSize(freeSwapSpaceSize).// setHostname(InetAddress.getLocalHost().getHostName()).// setEtcMsg(sb.toString()).// - build()).// + build()) + .// build()); } catch (IOException ioe) { http://git-wip-us.apache.org/repos/asf/kylin/blob/4adea167/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/filter/MassInValueProviderFactoryImpl.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/filter/MassInValueProviderFactoryImpl.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/filter/MassInValueProviderFactoryImpl.java new file mode 100644 index 0000000..fe6276f --- /dev/null +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/filter/MassInValueProviderFactoryImpl.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.storage.hbase.cube.v2.filter; + +import org.apache.kylin.dimension.DimensionEncoding; +import org.apache.kylin.metadata.filter.UDF.MassInValueProvider; +import org.apache.kylin.metadata.filter.UDF.MassInValueProviderFactory; +import org.apache.kylin.metadata.filter.function.Functions; +import org.apache.kylin.metadata.model.TblColRef; + +public class MassInValueProviderFactoryImpl implements MassInValueProviderFactory { + + public interface DimEncAware { + DimensionEncoding getDimEnc(TblColRef col); + } + + private DimEncAware dimEncAware = null; + + public MassInValueProviderFactoryImpl(DimEncAware dimEncAware) { + this.dimEncAware = dimEncAware; + } + + @Override + public MassInValueProvider getProvider(Functions.FilterTableType filterTableType, String filterResourceIdentifier, TblColRef col) { + return new MassInValueProviderImpl(filterTableType, filterResourceIdentifier, dimEncAware.getDimEnc(col)); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/4adea167/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/filter/MassInValueProviderImpl.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/filter/MassInValueProviderImpl.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/filter/MassInValueProviderImpl.java new file mode 100644 index 0000000..83a6671 --- /dev/null +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/filter/MassInValueProviderImpl.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.storage.hbase.cube.v2.filter; + +import java.io.IOException; +import java.io.InputStream; +import java.util.List; +import java.util.Set; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.dimension.DimensionEncoding; +import org.apache.kylin.metadata.filter.UDF.MassInValueProvider; +import org.apache.kylin.metadata.filter.function.Functions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Stopwatch; +import com.google.common.collect.Sets; + +public class MassInValueProviderImpl implements MassInValueProvider { + public static final Logger logger = LoggerFactory.getLogger(MassInValueProviderImpl.class); + + private Set<ByteArray> ret = Sets.newHashSet(); + + public MassInValueProviderImpl(Functions.FilterTableType filterTableType, String filterResourceIdentifier, DimensionEncoding encoding) { + + if (filterTableType == Functions.FilterTableType.HDFS) { + + logger.info("Start to load HDFS filter table from " + filterResourceIdentifier); + Stopwatch stopwatch = new Stopwatch().start(); + + FileSystem fileSystem; + try { + fileSystem = FileSystem.get(HBaseConfiguration.create()); + InputStream inputStream = fileSystem.open(new Path(filterResourceIdentifier)); + List<String> lines = IOUtils.readLines(inputStream); + + logger.info("Load HDFS finished after " + stopwatch.elapsedMillis() + " millis"); + + for (String line : lines) { + ByteArray byteArray = ByteArray.allocate(encoding.getLengthOfEncoding()); + encoding.encode(line.getBytes(), line.getBytes().length, byteArray.array(), 0); + ret.add(byteArray); + } + + logger.info("Mass In values constructed after " + stopwatch.elapsedMillis() + " millis, containing " + ret.size() + " entries"); + + } catch (IOException e) { + throw new RuntimeException("error when loading the mass in values", e); + } + } else { + throw new RuntimeException("HBASE_TABLE FilterTableType Not supported yet"); + } + } + + @Override + public Set<?> getMassInValues() { + return ret; + } +}
