http://git-wip-us.apache.org/repos/asf/kylin/blob/aed840f9/query/src/main/java/org/apache/kylin/query/relnode/ColumnRowType.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/relnode/ColumnRowType.java b/query/src/main/java/org/apache/kylin/query/relnode/ColumnRowType.java index f2894e7..ce50122 100644 --- a/query/src/main/java/org/apache/kylin/query/relnode/ColumnRowType.java +++ b/query/src/main/java/org/apache/kylin/query/relnode/ColumnRowType.java @@ -56,6 +56,10 @@ public class ColumnRowType { return columns.get(index); } + public TblColRef getColumnByName(String columnName) { + return getColumnByIndexNullable(getIndexByName(columnName)); + } + public int getIndexByName(String columnName) { for (int i = 0; i < columns.size(); i++) { if (columns.get(i).getName().equals(columnName)) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/aed840f9/query/src/main/java/org/apache/kylin/query/relnode/OLAPProjectRel.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPProjectRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPProjectRel.java index e7b09a3..8e93659 100644 --- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPProjectRel.java +++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPProjectRel.java @@ -158,7 +158,7 @@ public class OLAPProjectRel extends Project implements OLAPRel { RexCall call = (RexCall) rexNode; column = translateRexCall(call, inputColumnRowType, fieldName, sourceCollector); } else { - throw new IllegalStateException("Unsupport RexNode " + rexNode); + throw new IllegalStateException("Unsupported RexNode " + rexNode); } return column; } http://git-wip-us.apache.org/repos/asf/kylin/blob/aed840f9/query/src/main/java/org/apache/kylin/query/relnode/OLAPTableScan.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPTableScan.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPTableScan.java index b583291..7801891 100644 --- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPTableScan.java +++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPTableScan.java @@ -238,7 +238,7 @@ public class OLAPTableScan extends TableScan implements OLAPRel, EnumerableRel { TableRef tableRef = TblColRef.tableForUnknownModel(this.alias, olapTable.getSourceTable()); List<TblColRef> columns = new ArrayList<TblColRef>(); - for (ColumnDesc sourceColumn : olapTable.getExposedColumns()) { + for (ColumnDesc sourceColumn : olapTable.getSourceColumns()) { TblColRef colRef = TblColRef.columnForUnknownModel(tableRef, sourceColumn); columns.add(colRef); } http://git-wip-us.apache.org/repos/asf/kylin/blob/aed840f9/query/src/main/java/org/apache/kylin/query/schema/OLAPSchema.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/schema/OLAPSchema.java b/query/src/main/java/org/apache/kylin/query/schema/OLAPSchema.java index fe897e9..5387a74 100644 --- a/query/src/main/java/org/apache/kylin/query/schema/OLAPSchema.java +++ b/query/src/main/java/org/apache/kylin/query/schema/OLAPSchema.java @@ -41,6 +41,7 @@ public class OLAPSchema extends AbstractSchema { private KylinConfig config; private String projectName; private String schemaName; + private boolean exposeMore; private StorageURL storageUrl; private String starSchemaUrl; private String starSchemaUser; @@ -54,9 +55,10 @@ public class OLAPSchema extends AbstractSchema { this.starSchemaPassword = config.getHivePassword(); } - public OLAPSchema(String project, String schemaName) { + public OLAPSchema(String project, String schemaName, boolean exposeMore) { this.projectName = ProjectInstance.getNormalizedProjectName(project); this.schemaName = schemaName; + this.exposeMore = exposeMore; init(); } @@ -66,19 +68,21 @@ public class OLAPSchema extends AbstractSchema { * @return */ @Override - protected Map<String, Table> getTableMap() { + public Map<String, Table> getTableMap() { return buildTableMap(); } private Map<String, Table> buildTableMap() { Map<String, Table> olapTables = new HashMap<String, Table>(); - Collection<TableDesc> projectTables = ProjectManager.getInstance(config).listExposedTables(projectName); + Collection<TableDesc> projectTables = exposeMore + ? ProjectManager.getInstance(config).listDefinedTables(projectName) + : ProjectManager.getInstance(config).listExposedTables(projectName); for (TableDesc tableDesc : projectTables) { if (tableDesc.getDatabase().equals(schemaName)) { final String tableName = tableDesc.getName();//safe to use tableDesc.getName() here, it is in a DB context now - final OLAPTable table = new OLAPTable(this, tableDesc); + final OLAPTable table = new OLAPTable(this, tableDesc, exposeMore); olapTables.put(tableName, table); //logger.debug("Project " + projectName + " exposes table " + tableName); } http://git-wip-us.apache.org/repos/asf/kylin/blob/aed840f9/query/src/main/java/org/apache/kylin/query/schema/OLAPSchemaFactory.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/schema/OLAPSchemaFactory.java b/query/src/main/java/org/apache/kylin/query/schema/OLAPSchemaFactory.java index c098b23..2e9c951 100644 --- a/query/src/main/java/org/apache/kylin/query/schema/OLAPSchemaFactory.java +++ b/query/src/main/java/org/apache/kylin/query/schema/OLAPSchemaFactory.java @@ -54,7 +54,7 @@ public class OLAPSchemaFactory implements SchemaFactory { @Override public Schema create(SchemaPlus parentSchema, String schemaName, Map<String, Object> operand) { String project = (String) operand.get(SCHEMA_PROJECT); - Schema newSchema = new OLAPSchema(project, schemaName); + Schema newSchema = new OLAPSchema(project, schemaName, false); return newSchema; } http://git-wip-us.apache.org/repos/asf/kylin/blob/aed840f9/query/src/main/java/org/apache/kylin/query/schema/OLAPTable.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/schema/OLAPTable.java b/query/src/main/java/org/apache/kylin/query/schema/OLAPTable.java index a190166..3401033 100644 --- a/query/src/main/java/org/apache/kylin/query/schema/OLAPTable.java +++ b/query/src/main/java/org/apache/kylin/query/schema/OLAPTable.java @@ -92,13 +92,15 @@ public class OLAPTable extends AbstractQueryableTable implements TranslatableTab SQLTYPE_MAPPING.put("any", SqlTypeName.ANY); } + private final boolean exposeMore; private final OLAPSchema olapSchema; private final TableDesc sourceTable; private RelDataType rowType; - private List<ColumnDesc> exposedColumns; + private List<ColumnDesc> sourceColumns; - public OLAPTable(OLAPSchema schema, TableDesc tableDesc) { + public OLAPTable(OLAPSchema schema, TableDesc tableDesc, boolean exposeMore) { super(Object[].class); + this.exposeMore = exposeMore; this.olapSchema = schema; this.sourceTable = tableDesc; this.rowType = null; @@ -116,15 +118,18 @@ public class OLAPTable extends AbstractQueryableTable implements TranslatableTab return this.sourceTable.getIdentity(); } - public List<ColumnDesc> getExposedColumns() { - return exposedColumns; + public List<ColumnDesc> getSourceColumns() { + if (sourceColumns == null) { + sourceColumns = listSourceColumns(); + } + return sourceColumns; } @Override public RelDataType getRowType(RelDataTypeFactory typeFactory) { if (this.rowType == null) { // always build exposedColumns and rowType together - this.exposedColumns = listSourceColumns(); + this.sourceColumns = getSourceColumns(); this.rowType = deriveRowType(typeFactory); } return this.rowType; @@ -132,7 +137,7 @@ public class OLAPTable extends AbstractQueryableTable implements TranslatableTab private RelDataType deriveRowType(RelDataTypeFactory typeFactory) { RelDataTypeFactory.FieldInfoBuilder fieldInfo = typeFactory.builder(); - for (ColumnDesc column : exposedColumns) { + for (ColumnDesc column : sourceColumns) { RelDataType sqlType = createSqlType(typeFactory, column.getUpgradedType(), column.isNullable()); sqlType = SqlTypeUtil.addCharsetAndCollation(sqlType, typeFactory); fieldInfo.add(column.getName(), sqlType); @@ -169,7 +174,10 @@ public class OLAPTable extends AbstractQueryableTable implements TranslatableTab private List<ColumnDesc> listSourceColumns() { ProjectManager mgr = ProjectManager.getInstance(olapSchema.getConfig()); - List<ColumnDesc> tableColumns = mgr.listExposedColumns(olapSchema.getProjectName(), sourceTable); + // take care of computed columns + boolean exposeMore = olapSchema.getConfig().isPushDownEnabled() || this.exposeMore; + + List<ColumnDesc> tableColumns = mgr.listExposedColumns(olapSchema.getProjectName(), sourceTable, exposeMore); List<ColumnDesc> metricColumns = Lists.newArrayList(); List<MeasureDesc> countMeasures = mgr.listEffectiveRewriteMeasures(olapSchema.getProjectName(), http://git-wip-us.apache.org/repos/asf/kylin/blob/aed840f9/query/src/main/java/org/apache/kylin/query/util/KeywordDefaultDirtyHack.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/util/KeywordDefaultDirtyHack.java b/query/src/main/java/org/apache/kylin/query/util/KeywordDefaultDirtyHack.java index 23faf8e..cbffe03 100644 --- a/query/src/main/java/org/apache/kylin/query/util/KeywordDefaultDirtyHack.java +++ b/query/src/main/java/org/apache/kylin/query/util/KeywordDefaultDirtyHack.java @@ -21,7 +21,7 @@ package org.apache.kylin.query.util; public class KeywordDefaultDirtyHack implements QueryUtil.IQueryTransformer { @Override - public String transform(String sql, String project) { + public String transform(String sql, String project, String defaultSchema) { // KYLIN-2108, DEFAULT is hive default database, but a sql keyword too, needs quote sql = sql.replace("DEFAULT.", "\"DEFAULT\"."); sql = sql.replace("default.", "\"default\"."); http://git-wip-us.apache.org/repos/asf/kylin/blob/aed840f9/query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java b/query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java index 51352d2..d89b04e 100644 --- a/query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java +++ b/query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java @@ -35,6 +35,9 @@ import org.apache.calcite.sql.SqlJoin; import org.apache.calcite.sql.SqlLiteral; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlOrderBy; +import org.apache.calcite.sql.SqlSelect; +import org.apache.calcite.sql.parser.SqlParseException; import org.apache.calcite.sql.util.SqlVisitor; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.text.StrBuilder; @@ -60,7 +63,7 @@ import com.google.common.collect.Lists; public class PushDownUtil { private static final Logger logger = LoggerFactory.getLogger(PushDownUtil.class); - public static boolean doPushDownQuery(String project, String sql, String schema, List<List<String>> results, + public static boolean doPushDownQuery(String project, String sql, String defaultSchema, List<List<String>> results, List<SelectedColumnMeta> columnMetas, SQLException sqlException) throws Exception { KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); @@ -75,42 +78,49 @@ public class PushDownUtil { logger.info("Query failed to utilize pre-calculation, routing to other engines", sqlException); IPushDownRunner runner = (IPushDownRunner) ClassUtil.newInstance(kylinConfig.getPushDownRunnerClassName()); - IPushDownConverter converter = (IPushDownConverter) ClassUtil - .newInstance(kylinConfig.getPushDownConverterClassName()); - runner.init(kylinConfig); - - logger.debug("Query pushdown runner {}", runner); - - String expandCC = restoreComputedColumnToExpr(sql, project); - if (!StringUtils.equals(expandCC, sql)) { - logger.info("computed column in sql is expanded to: " + expandCC); - } - if (schema != null && !schema.equals("DEFAULT")) { - expandCC = schemaCompletion(expandCC, schema); + logger.debug("Query Pushdown runner {}", runner); + + // String expandCC = restoreComputedColumnToExpr(sql, project); + // if (!StringUtils.equals(expandCC, sql)) { + // logger.info("computed column in sql is expanded to: " + expandCC); + // } + + // default schema in calcite does not apply to other engines. + // since this is a universql requirement, it's not implemented as a converter + if (defaultSchema != null && !defaultSchema.equals("DEFAULT")) { + String completed = schemaCompletion(sql, defaultSchema); + if (!sql.equals(completed)) { + logger.info("the query is converted to {} after schema completion", completed); + sql = completed; + } } - String adhocSql = converter.convert(expandCC); - if (!adhocSql.equals(expandCC)) { - logger.info("the query is converted to {} according to kylin.query.pushdown.converter-class-name", - adhocSql); + + for (String converterName : kylinConfig.getPushDownConverterClassNames()) { + IPushDownConverter converter = (IPushDownConverter) ClassUtil.newInstance(converterName); + String converted = converter.convert(sql); + if (!sql.equals(converted)) { + logger.info("the query is converted to {} after applying converter {}", converted, converterName); + sql = converted; + } } - runner.executeQuery(adhocSql, results, columnMetas); + runner.executeQuery(sql, results, columnMetas); return true; } else { return false; } } - static String schemaCompletion(String inputSql, String schema) { + static String schemaCompletion(String inputSql, String schema) throws SqlParseException { if (inputSql == null || inputSql.equals("")) { return ""; } - SqlNode fromNode = CalciteParser.getFromNode(inputSql); + SqlNode node = CalciteParser.parse(inputSql); // get all table node that don't have schema by visitor pattern FromTablesVisitor ftv = new FromTablesVisitor(); - fromNode.accept(ftv); + node.accept(ftv); List<SqlNode> tablesWithoutSchema = ftv.getTablesWithoutSchema(); // sql do not need completion if (tablesWithoutSchema.isEmpty()) { @@ -140,10 +150,6 @@ public class PushDownUtil { //find pattern like table.column or column + "((?<![\\p{L}_0-9\\.\\\"])([\\p{L}_0-9]+\\.)?([\\p{L}_0-9]+)(?![\\p{L}_0-9\\.\\\"]))"); - private final static Pattern identifierInExprPattern = Pattern.compile( - // a.b.c - "((?<![\\p{L}_0-9\\.\\\"])([\\p{L}_0-9]+\\.)([\\p{L}_0-9]+\\.)([\\p{L}_0-9]+)(?![\\p{L}_0-9\\.\\\"]))"); - private final static Pattern endWithAsPattern = Pattern.compile("\\s+as\\s+$", Pattern.CASE_INSENSITIVE); public static String restoreComputedColumnToExpr(String beforeSql, String project) { @@ -226,74 +232,84 @@ public class PushDownUtil { return CalciteParser.insertAliasInExpr(expr, tableAlias); } -} - -/** - * Created by jiatao.tao - * Get all the tables from "FROM" clause that without schema - */ -class FromTablesVisitor implements SqlVisitor<SqlNode> { - private List<SqlNode> tables; - - FromTablesVisitor() { - this.tables = new ArrayList<>(); - } - List<SqlNode> getTablesWithoutSchema() { - return tables; - } + /** + * Get all the tables from "FROM clause" that without schema + * subquery is only considered in "from clause" + */ + static class FromTablesVisitor implements SqlVisitor<SqlNode> { + private List<SqlNode> tables; - @Override - public SqlNode visit(SqlNodeList nodeList) { - return null; - } + FromTablesVisitor() { + this.tables = new ArrayList<>(); + } - @Override - public SqlNode visit(SqlLiteral literal) { - return null; - } + List<SqlNode> getTablesWithoutSchema() { + return tables; + } - @Override - public SqlNode visit(SqlCall call) { - if (call instanceof SqlBasicCall) { - SqlBasicCall node = (SqlBasicCall) call; - node.getOperands()[0].accept(this); + @Override + public SqlNode visit(SqlNodeList nodeList) { return null; } - if (call instanceof SqlJoin) { - SqlJoin node = (SqlJoin) call; - node.getLeft().accept(this); - node.getRight().accept(this); + + @Override + public SqlNode visit(SqlLiteral literal) { return null; } - for (SqlNode operand : call.getOperandList()) { - if (operand != null) { - operand.accept(this); + + @Override + public SqlNode visit(SqlCall call) { + if (call instanceof SqlSelect) { + SqlSelect select = (SqlSelect) call; + select.getFrom().accept(this); + return null; } + if (call instanceof SqlOrderBy) { + SqlOrderBy orderBy = (SqlOrderBy) call; + ((SqlSelect) orderBy.query).getFrom().accept(this); + return null; + } + if (call instanceof SqlBasicCall) { + SqlBasicCall node = (SqlBasicCall) call; + node.getOperands()[0].accept(this); + return null; + } + if (call instanceof SqlJoin) { + SqlJoin node = (SqlJoin) call; + node.getLeft().accept(this); + node.getRight().accept(this); + return null; + } + for (SqlNode operand : call.getOperandList()) { + if (operand != null) { + operand.accept(this); + } + } + return null; } - return null; - } - @Override - public SqlNode visit(SqlIdentifier id) { - if (id.names.size() == 1) { - tables.add(id); + @Override + public SqlNode visit(SqlIdentifier id) { + if (id.names.size() == 1) { + tables.add(id); + } + return null; } - return null; - } - @Override - public SqlNode visit(SqlDataTypeSpec type) { - return null; - } + @Override + public SqlNode visit(SqlDataTypeSpec type) { + return null; + } - @Override - public SqlNode visit(SqlDynamicParam param) { - return null; - } + @Override + public SqlNode visit(SqlDynamicParam param) { + return null; + } - @Override - public SqlNode visit(SqlIntervalQualifier intervalQualifier) { - return null; + @Override + public SqlNode visit(SqlIntervalQualifier intervalQualifier) { + return null; + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/aed840f9/query/src/main/java/org/apache/kylin/query/util/QueryUtil.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/util/QueryUtil.java b/query/src/main/java/org/apache/kylin/query/util/QueryUtil.java index 4befd33..3796d44 100644 --- a/query/src/main/java/org/apache/kylin/query/util/QueryUtil.java +++ b/query/src/main/java/org/apache/kylin/query/util/QueryUtil.java @@ -38,10 +38,10 @@ public class QueryUtil { private static List<IQueryTransformer> queryTransformers; public interface IQueryTransformer { - String transform(String sql, String project); + String transform(String sql, String project, String defaultSchema); } - public static String massageSql(String sql, String project, int limit, int offset) { + public static String massageSql(String sql, String project, int limit, int offset, String defaultSchema) { sql = sql.trim(); sql = sql.replace("\r", " ").replace("\n", System.getProperty("line.separator")); KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); @@ -68,7 +68,7 @@ public class QueryUtil { initQueryTransformers(); } for (IQueryTransformer t : queryTransformers) { - sql = t.transform(sql, project); + sql = t.transform(sql, project, defaultSchema); } return sql; } @@ -95,15 +95,22 @@ public class QueryUtil { private static final String S0 = "\\s*"; private static final String S1 = "\\s"; private static final String SM = "\\s+"; - private static final Pattern PTN_GROUP_BY = Pattern.compile(S1 + "GROUP" + SM + "BY" + S1, Pattern.CASE_INSENSITIVE); - private static final Pattern PTN_HAVING_COUNT_GREATER_THAN_ZERO = Pattern.compile(S1 + "HAVING" + SM + "[(]?" + S0 + "COUNT" + S0 + "[(]" + S0 + "1" + S0 + "[)]" + S0 + ">" + S0 + "0" + S0 + "[)]?", Pattern.CASE_INSENSITIVE); - private static final Pattern PTN_SUM_1 = Pattern.compile(S0 + "SUM" + S0 + "[(]" + S0 + "[1]" + S0 + "[)]" + S0, Pattern.CASE_INSENSITIVE); + private static final Pattern PTN_GROUP_BY = Pattern.compile(S1 + "GROUP" + SM + "BY" + S1, + Pattern.CASE_INSENSITIVE); + private static final Pattern PTN_HAVING_COUNT_GREATER_THAN_ZERO = Pattern.compile(S1 + "HAVING" + SM + "[(]?" + + S0 + "COUNT" + S0 + "[(]" + S0 + "1" + S0 + "[)]" + S0 + ">" + S0 + "0" + S0 + "[)]?", + Pattern.CASE_INSENSITIVE); + private static final Pattern PTN_SUM_1 = Pattern.compile(S0 + "SUM" + S0 + "[(]" + S0 + "[1]" + S0 + "[)]" + S0, + Pattern.CASE_INSENSITIVE); private static final Pattern PTN_NOT_EQ = Pattern.compile(S0 + "!=" + S0, Pattern.CASE_INSENSITIVE); - private static final Pattern PTN_INTERVAL = Pattern.compile("interval" + SM + "(floor\\()([\\d\\.]+)(\\))" + SM + "(second|minute|hour|day|month|year)", Pattern.CASE_INSENSITIVE); - private static final Pattern PTN_HAVING_ESCAPE_FUNCTION = Pattern.compile("\\{fn" + "(.*?)" + "\\}", Pattern.CASE_INSENSITIVE); + private static final Pattern PTN_INTERVAL = Pattern.compile( + "interval" + SM + "(floor\\()([\\d\\.]+)(\\))" + SM + "(second|minute|hour|day|month|year)", + Pattern.CASE_INSENSITIVE); + private static final Pattern PTN_HAVING_ESCAPE_FUNCTION = Pattern.compile("\\{fn" + "(.*?)" + "\\}", + Pattern.CASE_INSENSITIVE); @Override - public String transform(String sql, String project) { + public String transform(String sql, String project, String defaultSchema) { Matcher m; // Case fn{ EXTRACT(...) } http://git-wip-us.apache.org/repos/asf/kylin/blob/aed840f9/query/src/test/java/org/apache/kylin/query/util/PushDownUtilTest.java ---------------------------------------------------------------------- diff --git a/query/src/test/java/org/apache/kylin/query/util/PushDownUtilTest.java b/query/src/test/java/org/apache/kylin/query/util/PushDownUtilTest.java index 8ed58f7..afae2f2 100644 --- a/query/src/test/java/org/apache/kylin/query/util/PushDownUtilTest.java +++ b/query/src/test/java/org/apache/kylin/query/util/PushDownUtilTest.java @@ -17,6 +17,7 @@ */ package org.apache.kylin.query.util; +import org.apache.calcite.sql.parser.SqlParseException; import org.apache.kylin.metadata.model.ComputedColumnDesc; import org.junit.Assert; import org.junit.Test; @@ -24,7 +25,7 @@ import org.mockito.Mockito; public class PushDownUtilTest { @Test - public void testSchemaCompletion() { + public void testSchemaCompletion() throws SqlParseException { String sql1 = "SELECT a \n" + "FROM a.KYLIN_SALES as KYLIN_SALES\n" + "INNER JOIN \"A\".KYLIN_ACCOUNT as BUYER_ACCOUNT\n" + @@ -43,7 +44,10 @@ public class PushDownUtilTest { " ON test_kylin_fact.leaf_categ_id = test_category_groupings.leaf_categ_id AND test_kylin_fact.lstg_site_id = test_category_groupings.site_id\n" + " inner JOIN test_sites as test_sites\n" + " ON test_kylin_fact.lstg_site_id = test_sites.site_id\n" + + " where price > 100\n" + " group by test_cal_dt.week_beg_dt\n" + + " having sum(price) > 1000\n" + + " order by sum(price)\n" + ") t1\n" + "inner join (\n" + " select test_cal_dt.week_beg_dt, count(*) as cnt\n" + @@ -77,7 +81,10 @@ public class PushDownUtilTest { " ON test_kylin_fact.leaf_categ_id = test_category_groupings.leaf_categ_id AND test_kylin_fact.lstg_site_id = test_category_groupings.site_id\n" + " inner JOIN EDW.test_sites as test_sites\n" + " ON test_kylin_fact.lstg_site_id = test_sites.site_id\n" + + " where price > 100\n" + " group by test_cal_dt.week_beg_dt\n" + + " having sum(price) > 1000\n" + + " order by sum(price)\n" + ") t1\n" + "inner join (\n" + " select test_cal_dt.week_beg_dt, count(*) as cnt\n" + @@ -97,6 +104,40 @@ public class PushDownUtilTest { } @Test + public void testSchemaCompletionWithComplexSubquery() throws SqlParseException { + String sql = + "SELECT a, b " + + "FROM (" + + " SELECT c, d, sum(p) " + + " FROM table1 t1, DB.table2 t2 " + + " WHERE t1.c > t2.d " + + " GROUP BY t.e" + + " HAVING sum(p) > 100" + + " ORDER BY t2.f" + + ") at1 " + + "INNER JOIN table3 t3 " + + "ON at1.c = t3.c " + + "WHERE t3.d > 0 " + + "ORDER BY t3.e"; + + String exceptSQL = + "SELECT a, b " + + "FROM (" + + " SELECT c, d, sum(p) " + + " FROM EDW.table1 t1, DB.table2 t2 " + + " WHERE t1.c > t2.d " + + " GROUP BY t.e" + + " HAVING sum(p) > 100" + + " ORDER BY t2.f" + + ") at1 " + + "INNER JOIN EDW.table3 t3 " + + "ON at1.c = t3.c " + + "WHERE t3.d > 0 " + + "ORDER BY t3.e"; + Assert.assertEquals(exceptSQL, PushDownUtil.schemaCompletion(sql, "EDW")); + } + + @Test public void testReplaceIdentifierInExpr() { { String ret = PushDownUtil.replaceIdentifierInExpr("x * y", null, false); http://git-wip-us.apache.org/repos/asf/kylin/blob/aed840f9/query/src/test/java/org/apache/kylin/query/util/QueryUtilTest.java ---------------------------------------------------------------------- diff --git a/query/src/test/java/org/apache/kylin/query/util/QueryUtilTest.java b/query/src/test/java/org/apache/kylin/query/util/QueryUtilTest.java index f168d1e..942ef0b 100644 --- a/query/src/test/java/org/apache/kylin/query/util/QueryUtilTest.java +++ b/query/src/test/java/org/apache/kylin/query/util/QueryUtilTest.java @@ -40,13 +40,15 @@ public class QueryUtilTest extends LocalFileMetadataTestCase { public void testMassageSql() { { String sql = "select ( date '2001-09-28' + interval floor(1.2) day) from test_kylin_fact"; - String s = QueryUtil.massageSql(sql, null, 0, 0); + String s = QueryUtil.massageSql(sql, null, 0, 0, "DEFAULT"); Assert.assertEquals("select ( date '2001-09-28' + interval '1' day) from test_kylin_fact", s); } { String sql = "select ( date '2001-09-28' + interval floor(2) month) from test_kylin_fact group by ( date '2001-09-28' + interval floor(2) month)"; - String s = QueryUtil.massageSql(sql, null, 0, 0); - Assert.assertEquals("select ( date '2001-09-28' + interval '2' month) from test_kylin_fact group by ( date '2001-09-28' + interval '2' month)", s); + String s = QueryUtil.massageSql(sql, null, 0, 0, "DEFAULT"); + Assert.assertEquals( + "select ( date '2001-09-28' + interval '2' month) from test_kylin_fact group by ( date '2001-09-28' + interval '2' month)", + s); } } @@ -54,7 +56,7 @@ public class QueryUtilTest extends LocalFileMetadataTestCase { public void testKeywordDefaultDirtyHack() { { String sql = "select * from DEFAULT.TEST_KYLIN_FACT"; - String s = QueryUtil.massageSql(sql, null, 0, 0); + String s = QueryUtil.massageSql(sql, null, 0, 0, "DEFAULT"); Assert.assertEquals("select * from \"DEFAULT\".TEST_KYLIN_FACT", s); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/aed840f9/server-base/src/main/java/org/apache/kylin/rest/controller2/ModelControllerV2.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller2/ModelControllerV2.java b/server-base/src/main/java/org/apache/kylin/rest/controller2/ModelControllerV2.java index f7cb844..12cd717 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller2/ModelControllerV2.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller2/ModelControllerV2.java @@ -103,7 +103,7 @@ public class ModelControllerV2 extends BasicController { // official models for (DataModelDesc m : modelService.listAllModels(modelName, projectName, exactMatch)) { Preconditions.checkState(!m.isDraft()); - + DataModelDescResponse r = new DataModelDescResponse(m); r.setProject(projectService.getProjectOfModel(m.getName())); response.add(r); @@ -154,7 +154,7 @@ public class ModelControllerV2 extends BasicController { DraftManager draftMgr = modelService.getDraftManager(); DataModelDesc modelDesc = deserializeDataModelDescV2(modelRequest); - modelService.validateModelDesc(modelDesc); + modelService.primaryCheck(modelDesc); String project = (null == modelRequest.getProject()) ? ProjectInstance.DEFAULT_PROJECT_NAME : modelRequest.getProject(); @@ -175,6 +175,20 @@ public class ModelControllerV2 extends BasicController { return new EnvelopeResponse(ResponseCode.CODE_SUCCESS, data, ""); } + @RequestMapping(value = "/validness", method = { RequestMethod.POST }, produces = { + "application/vnd.apache.kylin-v2+json" }) + @ResponseBody + public EnvelopeResponse checkModel(@RequestBody ModelRequest modelRequest) throws IOException { + Preconditions.checkNotNull(modelRequest.getProject()); + Preconditions.checkNotNull(modelRequest.getModelDescData()); + + DataModelDesc modelDesc = deserializeDataModelDescV2(modelRequest); + modelService.primaryCheck(modelDesc); + modelService.checkCCExpression(modelDesc, modelRequest.getProject()); + + return new EnvelopeResponse(ResponseCode.CODE_SUCCESS, null, ""); + } + @RequestMapping(value = "/draft", method = { RequestMethod.PUT }, produces = { "application/vnd.apache.kylin-v2+json" }) @ResponseBody @@ -182,11 +196,11 @@ public class ModelControllerV2 extends BasicController { DraftManager draftMgr = modelService.getDraftManager(); DataModelDesc modelDesc = deserializeDataModelDescV2(modelRequest); - modelService.validateModelDesc(modelDesc); + modelService.primaryCheck(modelDesc); String project = (null == modelRequest.getProject()) ? ProjectInstance.DEFAULT_PROJECT_NAME : modelRequest.getProject(); - + if (modelDesc.getUuid() == null) modelDesc.updateRandomUuid(); modelDesc.setDraft(true); @@ -201,8 +215,8 @@ public class ModelControllerV2 extends BasicController { return new EnvelopeResponse(ResponseCode.CODE_SUCCESS, data, ""); } - @RequestMapping(value = "/{projectName}/{modelName}", method = {RequestMethod.DELETE}, produces = { - "application/vnd.apache.kylin-v2+json"}) + @RequestMapping(value = "/{projectName}/{modelName}", method = { RequestMethod.DELETE }, produces = { + "application/vnd.apache.kylin-v2+json" }) @ResponseBody public void deleteModelV2(@PathVariable String projectName, @PathVariable String modelName) throws IOException { Message msg = MsgPicker.getMsg(); @@ -270,7 +284,7 @@ public class ModelControllerV2 extends BasicController { DataModelDesc desc = null; try { - logger.debug("Saving MODEL " + modelRequest.getModelDescData()); + logger.debug("deserialize MODEL " + modelRequest.getModelDescData()); desc = JsonUtil.readValue(modelRequest.getModelDescData(), DataModelDesc.class); } catch (JsonParseException e) { logger.error("The data model definition is not valid.", e); @@ -289,11 +303,13 @@ public class ModelControllerV2 extends BasicController { Map<String, Set<String>> data = new HashMap<>(); - for (Map.Entry<TblColRef, Set<CubeInstance>> entry : modelService.getUsedDimCols(modelName, projectName).entrySet()) { + for (Map.Entry<TblColRef, Set<CubeInstance>> entry : modelService.getUsedDimCols(modelName, projectName) + .entrySet()) { populateUsedColResponse(entry.getKey(), entry.getValue(), data); } - for (Map.Entry<TblColRef, Set<CubeInstance>> entry : modelService.getUsedNonDimCols(modelName, projectName).entrySet()) { + for (Map.Entry<TblColRef, Set<CubeInstance>> entry : modelService.getUsedNonDimCols(modelName, projectName) + .entrySet()) { populateUsedColResponse(entry.getKey(), entry.getValue(), data); } http://git-wip-us.apache.org/repos/asf/kylin/blob/aed840f9/server-base/src/main/java/org/apache/kylin/rest/service/ModelService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/ModelService.java b/server-base/src/main/java/org/apache/kylin/rest/service/ModelService.java index 4efb894..6373cb8 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/ModelService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/ModelService.java @@ -27,15 +27,23 @@ import java.util.Map; import java.util.Set; import org.apache.commons.lang.StringUtils; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.RootPersistentEntity; +import org.apache.kylin.common.util.HiveCmdBuilder; +import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.job.JoinedFlatTable; import org.apache.kylin.metadata.draft.Draft; +import org.apache.kylin.metadata.model.ComputedColumnDesc; import org.apache.kylin.metadata.model.DataModelDesc; +import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; +import org.apache.kylin.metadata.model.ISegment; import org.apache.kylin.metadata.model.JoinsTree; import org.apache.kylin.metadata.model.ModelDimensionDesc; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.metadata.model.tool.CalciteParser; import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.rest.exception.BadRequestException; import org.apache.kylin.rest.exception.ForbiddenException; @@ -73,7 +81,8 @@ public class ModelService extends BasicService { @Autowired private AclEvaluate aclEvaluate; - public List<DataModelDesc> listAllModels(final String modelName, final String projectName, boolean exactMatch) throws IOException { + public List<DataModelDesc> listAllModels(final String modelName, final String projectName, boolean exactMatch) + throws IOException { List<DataModelDesc> models; ProjectInstance project = (null != projectName) ? getProjectManager().getProject(projectName) : null; @@ -204,9 +213,97 @@ public class ModelService extends BasicService { return ret; } - private boolean validateUpdatingModel(DataModelDesc dataModelDesc, String project) throws IOException { + /** + * check if the computed column expressions are valid ( in hive) + */ + public boolean checkCCExpression(final DataModelDesc dataModelDesc, String project) throws IOException { + + dataModelDesc.setDraft(false); + if (dataModelDesc.getUuid() == null) + dataModelDesc.updateRandomUuid(); + + dataModelDesc.init(getConfig(), getMetadataManager().getAllTablesMap(project), + getMetadataManager().listDataModels()); + + for (ComputedColumnDesc cc : dataModelDesc.getComputedColumnDescs()) { + + //check by calcite parser + CalciteParser.ensureAliasInExpr(cc.getExpression(), dataModelDesc.getAliasMap().keySet()); + + //check by hive cli, this could be slow + StringBuilder sb = new StringBuilder(); + sb.append("select "); + sb.append(cc.getExpression()); + sb.append(" "); + JoinedFlatTable.appendJoinStatement(new IJoinedFlatTableDesc() { + @Override + public String getTableName() { + return null; + } + + @Override + public DataModelDesc getDataModel() { + return dataModelDesc; + } + + @Override + public List<TblColRef> getAllColumns() { + return null; + } + + @Override + public int getColumnIndex(TblColRef colRef) { + return 0; + } + + @Override + public long getSourceOffsetStart() { + return 0; + } + + @Override + public long getSourceOffsetEnd() { + return 0; + } + + @Override + public TblColRef getDistributedBy() { + return null; + } + + @Override + public TblColRef getClusterBy() { + return null; + } + + @Override + public ISegment getSegment() { + return null; + } + }, sb, false); + sb.append(" limit 0"); + + final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); + hiveCmdBuilder.addStatement(sb.toString()); + + long ts = System.currentTimeMillis(); + Pair<Integer, String> response = KylinConfig.getInstanceFromEnv().getCliCommandExecutor() + .execute(hiveCmdBuilder.toString()); + logger.debug("Spent " + (System.currentTimeMillis() - ts) + + " ms to execute the hive command to validate computed column expression: " + cc.getExpression()); + if (response.getFirst() != 0) { + throw new IllegalArgumentException("The expression " + cc.getExpression() + + " failed syntax check with output message: " + response.getSecond()); + } + } + + return true; + } + + private boolean checkIfBreakExistingCubes(DataModelDesc dataModelDesc, String project) throws IOException { String modelName = dataModelDesc.getName(); List<CubeInstance> cubes = cubeService.listAllCubes(null, project, modelName, true); + if (cubes != null && cubes.size() != 0) { dataModelDesc.init(getConfig(), getMetadataManager().getAllTablesMap(dataModelDesc.getProject()), getMetadataManager().listDataModels()); @@ -254,7 +351,7 @@ public class ModelService extends BasicService { return true; } - public void validateModelDesc(DataModelDesc modelDesc) { + public void primaryCheck(DataModelDesc modelDesc) { Message msg = MsgPicker.getMsg(); if (modelDesc == null) { @@ -276,7 +373,7 @@ public class ModelService extends BasicService { public DataModelDesc updateModelToResourceStore(DataModelDesc modelDesc, String projectName) throws IOException { aclEvaluate.hasProjectWritePermission(getProjectManager().getProject(projectName)); Message msg = MsgPicker.getMsg(); - + modelDesc.setDraft(false); if (modelDesc.getUuid() == null) modelDesc.updateRandomUuid(); @@ -287,7 +384,7 @@ public class ModelService extends BasicService { modelDesc = createModelDesc(projectName, modelDesc); } else { // update - if (!validateUpdatingModel(modelDesc, projectName)) { + if (!checkIfBreakExistingCubes(modelDesc, projectName)) { throw new BadRequestException(msg.getUPDATE_MODEL_KEY_FIELD()); } modelDesc = updateModelAndDesc(modelDesc); @@ -299,7 +396,7 @@ public class ModelService extends BasicService { if (!modelDesc.getError().isEmpty()) { throw new BadRequestException(String.format(msg.getBROKEN_MODEL_DESC(), modelDesc.getName())); } - + return modelDesc; } @@ -321,7 +418,7 @@ public class ModelService extends BasicService { return null; } - public List<Draft> listModelDrafts(String modelName, String projectName) throws IOException { + public List<Draft> listModelDrafts(String modelName, String projectName) throws IOException { ProjectInstance project = (null != projectName) ? getProjectManager().getProject(projectName) : null; if (null == project) { aclEvaluate.checkIsGlobalAdmin(); @@ -339,7 +436,7 @@ public class ModelService extends BasicService { result.add(d); } } - + return result; } } http://git-wip-us.apache.org/repos/asf/kylin/blob/aed840f9/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java index f469117..75b968b 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -82,6 +82,7 @@ import org.apache.kylin.metadata.querymeta.TableMetaWithType; import org.apache.kylin.metadata.realization.RealizationType; import org.apache.kylin.query.QueryConnection; import org.apache.kylin.query.relnode.OLAPContext; +import org.apache.kylin.query.util.PushDownUtil; import org.apache.kylin.query.util.QueryUtil; import org.apache.kylin.rest.constant.Constant; import org.apache.kylin.rest.exception.BadRequestException; @@ -94,7 +95,6 @@ import org.apache.kylin.rest.request.PrepareSqlRequest; import org.apache.kylin.rest.request.SQLRequest; import org.apache.kylin.rest.response.SQLResponse; import org.apache.kylin.rest.util.AclEvaluate; -import org.apache.kylin.query.util.PushDownUtil; import org.apache.kylin.rest.util.TableauInterceptor; import org.apache.kylin.storage.hybrid.HybridInstance; import org.slf4j.Logger; @@ -473,38 +473,47 @@ public class QueryService extends BasicService { } private SQLResponse queryWithSqlMassage(SQLRequest sqlRequest) throws Exception { - String userInfo = SecurityContextHolder.getContext().getAuthentication().getName(); - final Collection<? extends GrantedAuthority> grantedAuthorities = SecurityContextHolder.getContext() - .getAuthentication().getAuthorities(); - for (GrantedAuthority grantedAuthority : grantedAuthorities) { - userInfo += ","; - userInfo += grantedAuthority.getAuthority(); - } + Connection conn = null; - SQLResponse fakeResponse = TableauInterceptor.tableauIntercept(sqlRequest.getSql()); - if (null != fakeResponse) { - logger.debug("Return fake response, is exception? " + fakeResponse.getIsException()); - return fakeResponse; - } + try { + conn = QueryConnection.getConnection(sqlRequest.getProject()); - String correctedSql = QueryUtil.massageSql(sqlRequest.getSql(), sqlRequest.getProject(), sqlRequest.getLimit(), - sqlRequest.getOffset()); - if (!correctedSql.equals(sqlRequest.getSql())) { - logger.info("The corrected query: " + correctedSql); + String userInfo = SecurityContextHolder.getContext().getAuthentication().getName(); + final Collection<? extends GrantedAuthority> grantedAuthorities = SecurityContextHolder.getContext() + .getAuthentication().getAuthorities(); + for (GrantedAuthority grantedAuthority : grantedAuthorities) { + userInfo += ","; + userInfo += grantedAuthority.getAuthority(); + } - //CAUTION: should not change sqlRequest content! - //sqlRequest.setSql(correctedSql); - } + SQLResponse fakeResponse = TableauInterceptor.tableauIntercept(sqlRequest.getSql()); + if (null != fakeResponse) { + logger.debug("Return fake response, is exception? " + fakeResponse.getIsException()); + return fakeResponse; + } + + String correctedSql = QueryUtil.massageSql(sqlRequest.getSql(), sqlRequest.getProject(), + sqlRequest.getLimit(), sqlRequest.getOffset(), conn.getSchema()); + if (!correctedSql.equals(sqlRequest.getSql())) { + logger.info("The corrected query: " + correctedSql); - // add extra parameters into olap context, like acceptPartial - Map<String, String> parameters = new HashMap<String, String>(); - parameters.put(OLAPContext.PRM_USER_AUTHEN_INFO, userInfo); - parameters.put(OLAPContext.PRM_ACCEPT_PARTIAL_RESULT, String.valueOf(sqlRequest.isAcceptPartial())); - OLAPContext.setParameters(parameters); - // force clear the query context before a new query - OLAPContext.clearThreadLocalContexts(); + //CAUTION: should not change sqlRequest content! + //sqlRequest.setSql(correctedSql); + } - return execute(correctedSql, sqlRequest); + // add extra parameters into olap context, like acceptPartial + Map<String, String> parameters = new HashMap<String, String>(); + parameters.put(OLAPContext.PRM_USER_AUTHEN_INFO, userInfo); + parameters.put(OLAPContext.PRM_ACCEPT_PARTIAL_RESULT, String.valueOf(sqlRequest.isAcceptPartial())); + OLAPContext.setParameters(parameters); + // force clear the query context before a new query + OLAPContext.clearThreadLocalContexts(); + + return execute(correctedSql, sqlRequest, conn); + + } finally { + DBUtils.closeQuietly(conn); + } } @@ -746,8 +755,7 @@ public class QueryService extends BasicService { * @return * @throws Exception */ - private SQLResponse execute(String correctedSql, SQLRequest sqlRequest) throws Exception { - Connection conn = null; + private SQLResponse execute(String correctedSql, SQLRequest sqlRequest, Connection conn) throws Exception { Statement stat = null; ResultSet resultSet = null; Boolean isPushDown = false; @@ -756,7 +764,6 @@ public class QueryService extends BasicService { List<SelectedColumnMeta> columnMetas = Lists.newArrayList(); try { - conn = QueryConnection.getConnection(sqlRequest.getProject()); // special case for prepare query. if (BackdoorToggles.getPrepareOnly()) { @@ -791,13 +798,13 @@ public class QueryService extends BasicService { results.add(oneRow); } } catch (SQLException sqlException) { - isPushDown = PushDownUtil.doPushDownQuery(sqlRequest.getProject(), correctedSql, conn.getSchema(), results, columnMetas, - sqlException); + isPushDown = PushDownUtil.doPushDownQuery(sqlRequest.getProject(), correctedSql, conn.getSchema(), results, + columnMetas, sqlException); if (!isPushDown) { throw sqlException; } } finally { - close(resultSet, stat, conn); + close(resultSet, stat, null);//conn is passed in, not my duty to close } return getSqlResponse(isPushDown, results, columnMetas);