This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit cb3f4e604f1caa8d13de7987912d5638d408bced Author: Pengfei Zhan <[email protected]> AuthorDate: Sat Mar 4 21:06:08 2023 +0800 KYLIN-5523 [FOLLOW UP] expand the filter condition expression to ensure compatibility --- .../kylin/metadata/model/tool/CalciteParser.java | 10 ++ .../kylin/rest/service/ModelSemanticHelper.java | 23 ++-- .../apache/kylin/rest/service/ModelService.java | 27 ++--- .../service/ModelServiceSemanticUpdateTest.java | 55 +++++++-- .../kylin/rest/service/ModelServiceTest.java | 18 +-- .../org/apache/kylin/query/util/EscapeParser.jj | 42 ++++++- .../org/apache/kylin/query/util/EscapeDialect.java | 4 + .../apache/kylin/query/util/EscapeFunction.java | 16 +++ .../org/apache/kylin/query/util/PushDownUtil.java | 17 ++- .../org/apache/kylin/query/util/QueryUtil.java | 15 +++ .../query/util/EscapeTransformerCalciteTest.java | 22 ++++ .../query/util/EscapeTransformerSparkSqlTest.java | 11 ++ .../apache/kylin/query/util/PushDownUtilTest.java | 8 +- .../org/apache/kylin/query/util/QueryUtilTest.java | 10 ++ .../engine/spark/builder/CreateFlatTable.scala | 123 +-------------------- .../kylin/engine/spark/job/FlatTableHelper.scala | 2 - .../job/stage/build/FlatTableAndDictBase.scala | 13 ++- .../spark/smarter/IndexDependencyParser.scala | 34 +++--- 18 files changed, 256 insertions(+), 194 deletions(-) diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/tool/CalciteParser.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/tool/CalciteParser.java index 16c6e85ba6..5a8f0fcf0e 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/tool/CalciteParser.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/tool/CalciteParser.java @@ -29,11 +29,13 @@ import java.util.concurrent.TimeUnit; import org.apache.calcite.avatica.util.Casing; import org.apache.calcite.avatica.util.Quoting; +import org.apache.calcite.config.NullCollation; import org.apache.calcite.sql.SqlDialect; import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNodeList; import org.apache.calcite.sql.SqlSelect; +import org.apache.calcite.sql.dialect.HiveSqlDialect; import org.apache.calcite.sql.parser.SqlParseException; import org.apache.calcite.sql.parser.SqlParser; import org.apache.calcite.sql.parser.SqlParserPos; @@ -58,6 +60,14 @@ import lombok.extern.slf4j.Slf4j; @Slf4j public class CalciteParser { + /** + * Overwrite {@link HiveSqlDialect#DEFAULT} with backtick quote. + */ + public static final HiveSqlDialect HIVE_SQL_DIALECT = new HiveSqlDialect( + EMPTY_CONTEXT.withDatabaseProduct(SqlDialect.DatabaseProduct.HIVE) // + .withNullCollation(NullCollation.LOW) // + .withIdentifierQuoteString(Quoting.BACK_TICK.string)); + private CalciteParser() { } diff --git a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelSemanticHelper.java b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelSemanticHelper.java index df14faaf5a..38307a881c 100644 --- a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelSemanticHelper.java +++ b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelSemanticHelper.java @@ -39,7 +39,6 @@ import java.util.stream.Collectors; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.dialect.CalciteSqlDialect; -import org.apache.calcite.sql.dialect.HiveSqlDialect; import org.apache.calcite.sql.util.SqlVisitor; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -97,6 +96,7 @@ import org.apache.kylin.metadata.model.util.scd2.SimplifiedJoinTableDesc; import org.apache.kylin.metadata.project.NProjectManager; import org.apache.kylin.metadata.recommendation.ref.OptRecManagerV2; import org.apache.kylin.query.util.PushDownUtil; +import org.apache.kylin.query.util.QueryUtil; import org.apache.kylin.rest.request.ModelRequest; import org.apache.kylin.rest.response.BuildIndexResponse; import org.apache.kylin.rest.response.SimplifiedMeasure; @@ -436,12 +436,12 @@ public class ModelSemanticHelper extends BasicService { .forEach(x -> x.changeTableAlias(oldAliasName, newAliasName)); model.getComputedColumnDescs().forEach(x -> changeTableAlias(x, oldAliasName, newAliasName)); - String filterCondition = model.getFilterCondition(); - if (StringUtils.isNotEmpty(filterCondition)) { - SqlVisitor<Object> modifyAlias = new ModifyTableNameSqlVisitor(oldAliasName, newAliasName); - SqlNode sqlNode = CalciteParser.getExpNode(filterCondition); - sqlNode.accept(modifyAlias); - String newFilterCondition = sqlNode.toSqlString(CalciteSqlDialect.DEFAULT, true).toString(); + if (StringUtils.isNotBlank(model.getFilterCondition())) { + String expr = QueryUtil.adaptCalciteSyntax(model.getFilterCondition()); + SqlNode sqlNode = CalciteParser.getExpNode(expr); + sqlNode.accept(new ModifyTableNameSqlVisitor(oldAliasName, newAliasName)); + + String newFilterCondition = sqlNode.toSqlString(CalciteParser.HIVE_SQL_DIALECT).toString(); model.setFilterCondition(newFilterCondition); } } @@ -451,7 +451,7 @@ public class ModelSemanticHelper extends BasicService { SqlVisitor<Object> modifyAlias = new ModifyTableNameSqlVisitor(oldAlias, newAlias); SqlNode sqlNode = CalciteParser.getExpNode(computedColumnDesc.getExpression()); sqlNode.accept(modifyAlias); - computedColumnDesc.setExpression(sqlNode.toSqlString(HiveSqlDialect.DEFAULT).toString()); + computedColumnDesc.setExpression(sqlNode.toSqlString(CalciteSqlDialect.DEFAULT).toString()); } private Map<String, String> getAliasTransformMap(NDataModel originModel, NDataModel expectModel) { @@ -868,7 +868,7 @@ public class ModelSemanticHelper extends BasicService { originModel.getEffectiveMeasures().keySet()); } - public boolean isFilterConditonNotChange(String oldFilterCondition, String newFilterCondition) { + public boolean isFilterConditionNotChange(String oldFilterCondition, String newFilterCondition) { oldFilterCondition = oldFilterCondition == null ? "" : oldFilterCondition; newFilterCondition = newFilterCondition == null ? "" : newFilterCondition; return StringUtils.trim(oldFilterCondition).equals(StringUtils.trim(newFilterCondition)); @@ -903,7 +903,7 @@ public class ModelSemanticHelper extends BasicService { return isDifferent(originModel.getPartitionDesc(), newModel.getPartitionDesc()) || !Objects.equals(originModel.getRootFactTable(), newModel.getRootFactTable()) || !originModel.getJoinsGraph().match(newModel.getJoinsGraph(), Maps.newHashMap()) - || !isFilterConditonNotChange(originModel.getFilterCondition(), newModel.getFilterCondition()) + || !isFilterConditionNotChange(originModel.getFilterCondition(), newModel.getFilterCondition()) || !isMultiPartitionDescSame(originModel.getMultiPartitionDesc(), newModel.getMultiPartitionDesc()) || !isAntiFlattenableSame(originModel.getJoinTables(), newModel.getJoinTables()); } @@ -977,7 +977,6 @@ public class ModelSemanticHelper extends BasicService { return SourceFactory.getSource(tableDesc).getSegmentRange(start, end); } - private void handleDatePartitionColumn(NDataModel newModel, NDataflowManager dataflowManager, NDataflow df, String modelId, String project, String start, String end) { // from having partition to no partition @@ -986,7 +985,7 @@ public class ModelSemanticHelper extends BasicService { Lists.newArrayList(SegmentRange.TimePartitionedSegmentRange.createInfinite())); return; } - // change partition column and from no partition to having partition + // change partition column and from no partition to having partition if (StringUtils.isNotEmpty(start) && StringUtils.isNotEmpty(end)) { dataflowManager.fillDfManually(df, Lists.newArrayList(getSegmentRangeByModel(project, modelId, start, end))); diff --git a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java index d0073de148..a7b83f9ceb 100644 --- a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java +++ b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java @@ -199,6 +199,7 @@ import org.apache.kylin.metadata.realization.RealizationStatusEnum; import org.apache.kylin.metadata.streaming.KafkaConfig; import org.apache.kylin.query.util.PushDownUtil; import org.apache.kylin.query.util.QueryParams; +import org.apache.kylin.query.util.QueryUtil; import org.apache.kylin.rest.aspect.Transaction; import org.apache.kylin.rest.constant.ModelStatusToDisplayEnum; import org.apache.kylin.rest.request.ModelConfigRequest; @@ -1305,7 +1306,7 @@ public class ModelService extends AbstractModelService implements TableModelSupp public String getModelSql(String modelId, String project) { aclEvaluate.checkProjectReadPermission(project); NDataModel model = getManager(NDataModelManager.class, project).getDataModelDesc(modelId); - return PushDownUtil.generateFlatTableSql(model, project, false); + return PushDownUtil.generateFlatTableSql(model, false); } public List<RelatedModelResponse> getRelateModels(String project, String table, String modelId) { @@ -1742,7 +1743,7 @@ public class ModelService extends AbstractModelService implements TableModelSupp if (prjInstance.getSourceType() == ISourceAware.ID_SPARK && model.getModelType() == NDataModel.ModelType.BATCH) { SparkSession ss = SparderEnv.getSparkSession(); - String flatTableSql = PushDownUtil.generateFlatTableSql(model, project, false); + String flatTableSql = PushDownUtil.generateFlatTableSql(model, false); QueryParams queryParams = new QueryParams(project, flatTableSql, "default", false); queryParams.setKylinConfig(prjInstance.getConfig()); queryParams.setAclInfo(AclPermissionUtil.createAclInfo(project, getCurrentUserGroups())); @@ -3622,23 +3623,23 @@ public class ModelService extends AbstractModelService implements TableModelSupp } /** - * massage and update model filter condition - * 1. expand computed columns - * 2. add missing identifier quotes - * 3. add missing table identifiers - * - * @param model + * Convert model filter condition: + * 1. transform special function and backtick + * 2. use Calcite to add table name + * 3. massage for push-down + * 4. update the filter condition */ void massageModelFilterCondition(final NDataModel model) { - if (StringUtils.isEmpty(model.getFilterCondition())) { + String filterCondition = model.getFilterCondition(); + if (StringUtils.isBlank(filterCondition)) { return; } - String filterConditionWithTableName = addTableNameIfNotExist(model.getFilterCondition(), model); + filterCondition = QueryUtil.adaptCalciteSyntax(filterCondition); + String newFilterCondition = addTableNameIfNotExist(filterCondition, model); QueryContext.AclInfo aclInfo = AclPermissionUtil.createAclInfo(model.getProject(), getCurrentUserGroups()); - // validate as soon as possible - PushDownUtil.massageExpression(model, model.getProject(), model.getFilterCondition(), aclInfo); - model.setFilterCondition(filterConditionWithTableName); + String massaged = PushDownUtil.massageExpression(model, model.getProject(), newFilterCondition, aclInfo); + model.setFilterCondition(massaged); } @VisibleForTesting diff --git a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceSemanticUpdateTest.java b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceSemanticUpdateTest.java index 9fe9ea8aaf..b65770e577 100644 --- a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceSemanticUpdateTest.java +++ b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceSemanticUpdateTest.java @@ -21,6 +21,8 @@ import static org.apache.kylin.common.exception.code.ErrorCodeServer.SIMPLIFIED_ import static org.hamcrest.Matchers.is; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; @@ -38,6 +40,7 @@ import org.apache.kylin.common.exception.KylinException; import org.apache.kylin.common.persistence.transaction.UnitOfWork; import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.common.util.NLocalFileMetadataTestCase; +import org.apache.kylin.common.util.Unsafe; import org.apache.kylin.cube.model.SelectRule; import org.apache.kylin.engine.spark.job.ExecutableAddCuboidHandler; import org.apache.kylin.engine.spark.job.NSparkCubingJob; @@ -1659,17 +1662,47 @@ public class ModelServiceSemanticUpdateTest extends NLocalFileMetadataTestCase { } @Test - public void testIsFilterConditonNotChange() { - Assert.assertTrue(semanticService.isFilterConditonNotChange(null, null)); - Assert.assertTrue(semanticService.isFilterConditonNotChange("", null)); - Assert.assertTrue(semanticService.isFilterConditonNotChange(null, " ")); - Assert.assertTrue(semanticService.isFilterConditonNotChange(" ", "")); - Assert.assertTrue(semanticService.isFilterConditonNotChange("", " ")); - Assert.assertTrue(semanticService.isFilterConditonNotChange("A=8", " A=8 ")); - - Assert.assertFalse(semanticService.isFilterConditonNotChange(null, "null")); - Assert.assertFalse(semanticService.isFilterConditonNotChange("", "null")); - Assert.assertFalse(semanticService.isFilterConditonNotChange("A=8", "A=9")); + public void testUpdateModelColumnForTableAliasModify() + throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { + NDataModel testModel = getTestModel(); + Map<String, String> map = Maps.newHashMap(); + map.put("TEST_ORDER", "TEST_ORDER1"); + testModel.setFilterCondition("`TEST_ORDER`.`ORDER_ID` > 1"); + ModelSemanticHelper semanticHelper = new ModelSemanticHelper(); + Class<? extends ModelSemanticHelper> clazz = semanticHelper.getClass(); + Method method = clazz.getDeclaredMethod("updateModelColumnForTableAliasModify", NDataModel.class, Map.class); + Unsafe.changeAccessibleObject(method, true); + method.invoke(semanticHelper, testModel, map); + Assert.assertEquals("`TEST_ORDER1`.`ORDER_ID` > 1", testModel.getFilterCondition()); + Unsafe.changeAccessibleObject(method, false); + } + + @Test + public void testChangeTableAlias() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { + ComputedColumnDesc cc = new ComputedColumnDesc(); + cc.setExpression("\"TEST_ORDER\".\"ORDER_ID\" + 1"); + ModelSemanticHelper semanticHelper = new ModelSemanticHelper(); + Class<? extends ModelSemanticHelper> clazz = semanticHelper.getClass(); + Method method = clazz.getDeclaredMethod("changeTableAlias", ComputedColumnDesc.class, String.class, + String.class); + Unsafe.changeAccessibleObject(method, true); + method.invoke(semanticHelper, cc, "TEST_ORDER", "TEST_ORDER1"); + Assert.assertEquals("\"TEST_ORDER1\".\"ORDER_ID\" + 1", cc.getExpression()); + Unsafe.changeAccessibleObject(method, false); + } + + @Test + public void testIsFilterConditionNotChange() { + Assert.assertTrue(semanticService.isFilterConditionNotChange(null, null)); + Assert.assertTrue(semanticService.isFilterConditionNotChange("", null)); + Assert.assertTrue(semanticService.isFilterConditionNotChange(null, " ")); + Assert.assertTrue(semanticService.isFilterConditionNotChange(" ", "")); + Assert.assertTrue(semanticService.isFilterConditionNotChange("", " ")); + Assert.assertTrue(semanticService.isFilterConditionNotChange("A=8", " A=8 ")); + + Assert.assertFalse(semanticService.isFilterConditionNotChange(null, "null")); + Assert.assertFalse(semanticService.isFilterConditionNotChange("", "null")); + Assert.assertFalse(semanticService.isFilterConditionNotChange("A=8", "A=9")); } @Test diff --git a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceTest.java b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceTest.java index 58971f625c..1e3e316bd8 100644 --- a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceTest.java +++ b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceTest.java @@ -3266,8 +3266,8 @@ public class ModelServiceTest extends SourceTestCase { modelRequest.getPartitionDesc().setPartitionDateFormat("yyyy-MM-dd"); String filterCond = "trans_id = 0 and TEST_KYLIN_FACT.order_id < 100 and DEAL_AMOUNT > 123"; - String expectedFilterCond = "(((\"TEST_KYLIN_FACT\".\"TRANS_ID\" = 0) " - + "AND (\"TEST_KYLIN_FACT\".\"ORDER_ID\" < 100)) AND (\"TEST_KYLIN_FACT\".\"DEAL_AMOUNT\" > 123))"; + String expectedFilterCond = "(((`TEST_KYLIN_FACT`.`TRANS_ID` = 0) AND (`TEST_KYLIN_FACT`.`ORDER_ID` < 100)) " + + "AND ((`TEST_KYLIN_FACT`.`PRICE` * `TEST_KYLIN_FACT`.`ITEM_COUNT`) > 123))"; modelRequest.setFilterCondition(filterCond); val newModel = modelService.createModel(modelRequest.getProject(), modelRequest); @@ -3291,7 +3291,7 @@ public class ModelServiceTest extends SourceTestCase { modelRequest.setUuid(null); String filterCond = "\"day\" = 0 and \"123TABLE\".\"day#\" = 1 and \"中文列\" = 1"; - String expectedFilterCond = "(((\"123TABLE\".\"DAY\" = 0) AND (\"123TABLE\".\"day#\" = 1)) AND (\"123TABLE\".\"中文列\" = 1))"; + String expectedFilterCond = "(((`123TABLE`.`DAY` = 0) AND (`123TABLE`.`day#` = 1)) AND (`123TABLE`.`中文列` = 1))"; modelRequest.setFilterCondition(filterCond); val newModel = modelService.createModel(modelRequest.getProject(), modelRequest); @@ -3540,8 +3540,9 @@ public class ModelServiceTest extends SourceTestCase { String originSql = "trans_id = 0 and TEST_KYLIN_FACT.order_id < 100 and DEAL_AMOUNT > 123"; model.setFilterCondition(originSql); modelService.massageModelFilterCondition(model); - Assert.assertEquals("(((\"TEST_KYLIN_FACT\".\"TRANS_ID\" = 0) AND (\"TEST_KYLIN_FACT\".\"ORDER_ID\" < 100)) " - + "AND (\"TEST_KYLIN_FACT\".\"DEAL_AMOUNT\" > 123))", model.getFilterCondition()); + Assert.assertEquals("(((`TEST_KYLIN_FACT`.`TRANS_ID` = 0) " + + "AND (`TEST_KYLIN_FACT`.`ORDER_ID` < 100)) AND ((`TEST_KYLIN_FACT`.`PRICE` * `TEST_KYLIN_FACT`.`ITEM_COUNT`) > 123))", + model.getFilterCondition()); } @Test @@ -3573,7 +3574,7 @@ public class ModelServiceTest extends SourceTestCase { final NDataModel model1 = modelManager.getDataModelDesc(modelId); model1.setFilterCondition("TIMESTAMPDIFF(DAY, CURRENT_DATE, TEST_KYLIN_FACT.\"CURRENT_DATE\") >= 0"); modelService.massageModelFilterCondition(model1); - Assert.assertEquals("(TIMESTAMPDIFF(DAY, CURRENT_DATE(), \"TEST_KYLIN_FACT\".\"CURRENT_DATE\") >= 0)", + Assert.assertEquals("(TIMESTAMPDIFF('DAY', CURRENT_DATE(), `TEST_KYLIN_FACT`.`CURRENT_DATE`) >= 0)", model1.getFilterCondition()); } @@ -3589,8 +3590,9 @@ public class ModelServiceTest extends SourceTestCase { String originSql = "trans_id = 0 and TEST_ORDER.order_id < 100 and DEAL_AMOUNT > 123"; model.setFilterCondition(originSql); modelService.massageModelFilterCondition(model); - Assert.assertEquals("(((\"TEST_KYLIN_FACT\".\"TRANS_ID\" = 0) AND (\"TEST_ORDER\".\"ORDER_ID\" < 100)) " - + "AND (\"TEST_KYLIN_FACT\".\"DEAL_AMOUNT\" > 123))", model.getFilterCondition()); + Assert.assertEquals("(((`TEST_KYLIN_FACT`.`TRANS_ID` = 0) " + + "AND (`TEST_ORDER`.`ORDER_ID` < 100)) AND ((`TEST_KYLIN_FACT`.`PRICE` * `TEST_KYLIN_FACT`.`ITEM_COUNT`) > 123))", + model.getFilterCondition()); } @Test diff --git a/src/query-common/src/main/codegen/javacc/org/apache/kylin/query/util/EscapeParser.jj b/src/query-common/src/main/codegen/javacc/org/apache/kylin/query/util/EscapeParser.jj index d09a3cfb32..f253afc12c 100644 --- a/src/query-common/src/main/codegen/javacc/org/apache/kylin/query/util/EscapeParser.jj +++ b/src/query-common/src/main/codegen/javacc/org/apache/kylin/query/util/EscapeParser.jj @@ -13,7 +13,7 @@ import java.util.List; import java.util.Locale; import java.util.Scanner; -import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.StringUtils; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; @@ -127,6 +127,8 @@ TOKEN : | < #EXPONENT: ["e","E"] (["+","-"])? (["0"-"9"])+ > | < DOT : "." > | < CEIL : "CEIL" > +| < CEIL_DATETIME : "CEIL_DATETIME" > +| < FLOOR_DATETIME : "FLOOR_DATETIME" > | < FLOOR : "FLOOR" > | < TO : "TO" > | < SUBSTRING : "SUBSTRING" > @@ -237,6 +239,7 @@ String Expression() : | innerString = CastExpression() | innerString = ExtractExpression() | innerString = CeilFloorExpress() + | innerString = CeilFloorDatetimeExpress() | innerString = ParenExpress() | innerString = QuotedString() | innerString = DoubleQuotedString() @@ -662,6 +665,42 @@ String CeilFloorExpress() : } } +String CeilFloorDatetimeExpress() : +{ + String functionName; + List <String> parameters = Lists.newArrayList(); + ImmutableSet<String> timeunitSet = ImmutableSet.of("YEAR", "QUARTER", "MONTH", "WEEK", + "DAY", "HOUR", "MINUTE", "SECOND"); +} +{ + ( < CEIL_DATETIME> | < FLOOR_DATETIME> ) + { + functionName = getToken(0).image; + } + (< SPACE >)* <LPAREN> (< SPACE >)* + { + parameters.add(ParameterExpression().trim()); + } + (< SPACE >)* + ( + <COMMA> (<SPACE>)* + { + String timeUnit = StringUtils.trim(ParameterExpression()); + String unitToValidate = StringUtils.remove(timeUnit, '\''); + if (!timeunitSet.contains(StringUtils.upperCase(unitToValidate))) { + throw new IllegalStateException(String.format(Locale.ROOT, CEIL_FLOOR_EXCEPTION_MSG, + functionName, unitToValidate, String.join(", ", timeunitSet))); + } + parameters.add(timeUnit); + } + (<SPACE>)* + )? + <RPAREN> + { + return dialect.transformFN(functionName, parameters.toArray(new String [ 0 ])); + } +} + String PiFunction() : { String function; @@ -766,6 +805,7 @@ String ParameterExpression() : | innerString = TsDiffOrAddExpression() | innerString = ExtractExpression() | innerString = CeilFloorExpress() + | innerString = CeilFloorDatetimeExpress() | innerString = Numeric() | innerString = Hint() | innerString = CubePriority() diff --git a/src/query-common/src/main/java/org/apache/kylin/query/util/EscapeDialect.java b/src/query-common/src/main/java/org/apache/kylin/query/util/EscapeDialect.java index 0e5a1b34c9..11561ea0aa 100644 --- a/src/query-common/src/main/java/org/apache/kylin/query/util/EscapeDialect.java +++ b/src/query-common/src/main/java/org/apache/kylin/query/util/EscapeDialect.java @@ -32,6 +32,8 @@ public abstract class EscapeDialect { private static final String FN_WEEK = "WEEK"; private static final String FN_CEIL = "CEIL"; private static final String FN_FLOOR = "FLOOR"; + private static final String FN_CEIL_DT = "CEIL_DATETIME"; + private static final String FN_FLOOR_DT = "FLOOR_DATETIME"; private static final String FN_SUBSTR = "SUBSTR"; private static final String FN_SUBSTRING = "SUBSTRING"; private static final String FN_ASCII = "ASCII"; @@ -74,6 +76,8 @@ public abstract class EscapeDialect { register(FN_WEEK, FnConversion.WEEK_CALCITE); register(FN_CEIL, FnConversion.CEIL); register(FN_FLOOR, FnConversion.FLOOR); + register(FN_CEIL_DT, FnConversion.CEIL_DT); + register(FN_FLOOR_DT, FnConversion.FLOOR_DT); register(FN_SUBSTR, FnConversion.SUSTR); register(FN_SUBSTRING, FnConversion.SUSTRING); diff --git a/src/query-common/src/main/java/org/apache/kylin/query/util/EscapeFunction.java b/src/query-common/src/main/java/org/apache/kylin/query/util/EscapeFunction.java index 09d8947a99..e297631bed 100644 --- a/src/query-common/src/main/java/org/apache/kylin/query/util/EscapeFunction.java +++ b/src/query-common/src/main/java/org/apache/kylin/query/util/EscapeFunction.java @@ -21,6 +21,8 @@ package org.apache.kylin.query.util; import java.util.List; import java.util.Locale; +import org.apache.commons.lang3.StringUtils; + import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -327,6 +329,20 @@ public class EscapeFunction { return normalFN("FLOOR", newArgs); }), + // such as: ceil_datetime(col, 'year') => ceil(col to year) + CEIL_DT(args -> { + Preconditions.checkArgument(args.length == 2, EscapeFunction.CEIL_EXCEPTION_MSG); + String[] newArgs = new String[] { args[0] + " to " + StringUtils.remove(args[1], '\'') }; + return normalFN("CEIL", newArgs); + }), + + // such as: floor_datetime(col, 'year') => floor(col to year) + FLOOR_DT(args -> { + Preconditions.checkArgument(args.length == 2, EscapeFunction.FLOOR_EXCEPTION_MSG); + String[] newArgs = new String[] { args[0] + " to " + StringUtils.remove(args[1], '\'') }; + return normalFN("FLOOR", newArgs); + }), + // tableau func SPACE(args -> { checkArgs(args, 1); diff --git a/src/query-common/src/main/java/org/apache/kylin/query/util/PushDownUtil.java b/src/query-common/src/main/java/org/apache/kylin/query/util/PushDownUtil.java index 4f7f467e17..60c02d73dc 100644 --- a/src/query-common/src/main/java/org/apache/kylin/query/util/PushDownUtil.java +++ b/src/query-common/src/main/java/org/apache/kylin/query/util/PushDownUtil.java @@ -173,7 +173,11 @@ public class PushDownUtil { public static String massageExpression(NDataModel model, String project, String expression, QueryContext.AclInfo aclInfo) { - String ccSql = expandComputedColumnExp(model, project, expression); + if (StringUtils.isBlank(expression)) { + return ""; + } + + String ccSql = expandComputedColumnExp(model, project, StringHelper.backtickToDoubleQuote(expression)); QueryParams queryParams = new QueryParams(project, ccSql, PushDownUtil.DEFAULT_SCHEMA, false); queryParams.setKylinConfig(NProjectManager.getProjectConfig(project)); queryParams.setAclInfo(aclInfo); @@ -223,9 +227,10 @@ public class PushDownUtil { } /** - * This method is currently only used for verifying the flat-table generated by the model. + * Generate flat-table SQL which can be parsed by Apache Calcite. + * If querying push-down is required, please use it in conjunction with {@link PushDownUtil#massagePushDownSql}. */ - public static String generateFlatTableSql(NDataModel model, String project, boolean singleLine) { + public static String generateFlatTableSql(NDataModel model, boolean singleLine) { String sep = singleLine ? " " : "\n"; StringBuilder sqlBuilder = new StringBuilder(); sqlBuilder.append("SELECT ").append(sep); @@ -237,7 +242,7 @@ public class PushDownUtil { String allColStr = tblColRefs.stream() // .filter(colRef -> !colRef.getColumnDesc().isComputedColumn()) // .map(colRef -> { - String s = colRef.getTableAlias() + UNDER_LINE + colRef.getName(); + String s = colRef.getTableAlias() + PushDownUtil.UNDER_LINE + colRef.getName(); String colName = StringHelper.doubleQuote(s); return colRef.getDoubleQuoteExp() + " as " + colName + sep; }).collect(Collectors.joining(", ")); @@ -251,8 +256,8 @@ public class PushDownUtil { sqlBuilder.append("WHERE ").append(sep); sqlBuilder.append("1 = 1").append(sep); if (StringUtils.isNotEmpty(model.getFilterCondition())) { - massageExpression(model, project, model.getFilterCondition(), null); - sqlBuilder.append(" AND (").append(model.getFilterCondition()).append(") ").append(sep); + String filterConditionWithCalciteFormat = QueryUtil.adaptCalciteSyntax(model.getFilterCondition()); + sqlBuilder.append(" AND (").append(filterConditionWithCalciteFormat).append(") ").append(sep); } return new EscapeTransformer().transform(sqlBuilder.toString()); diff --git a/src/query-common/src/main/java/org/apache/kylin/query/util/QueryUtil.java b/src/query-common/src/main/java/org/apache/kylin/query/util/QueryUtil.java index 6459d29a10..77c7b2784d 100644 --- a/src/query-common/src/main/java/org/apache/kylin/query/util/QueryUtil.java +++ b/src/query-common/src/main/java/org/apache/kylin/query/util/QueryUtil.java @@ -34,6 +34,7 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.QueryContext; import org.apache.kylin.common.exception.KylinTimeoutException; import org.apache.kylin.common.util.ClassUtil; +import org.apache.kylin.common.util.StringHelper; import org.apache.kylin.metadata.project.NProjectManager; import org.apache.kylin.metadata.query.BigQueryThresholdUpdater; import org.apache.kylin.query.IQueryTransformer; @@ -60,6 +61,7 @@ public class QueryUtil { private static final String SEMI_COLON = ";"; public static final String JDBC = "jdbc"; private static final Map<String, IQueryTransformer> QUERY_TRANSFORMER_MAP = Maps.newConcurrentMap(); + private static final EscapeTransformer ESCAPE_TRANSFORMER = new EscapeTransformer(); static { String[] classNames = KylinConfig.getInstanceFromEnv().getQueryTransformers(); @@ -76,6 +78,19 @@ public class QueryUtil { private QueryUtil() { } + /** + * Convert special functions in the input string into the counterpart + * which can be parsed by Apache Calcite. + */ + public static String adaptCalciteSyntax(String str) { + if (StringUtils.isBlank(str)) { + return str; + } + String transformed = ESCAPE_TRANSFORMER.transform(str); + transformed = StringHelper.backtickToDoubleQuote(transformed); + return transformed; + } + public static boolean isSelectStatement(String sql) { String sql1 = sql.toLowerCase(Locale.ROOT); sql1 = removeCommentInSql(sql1); diff --git a/src/query/src/test/java/org/apache/kylin/query/util/EscapeTransformerCalciteTest.java b/src/query/src/test/java/org/apache/kylin/query/util/EscapeTransformerCalciteTest.java index baefd0fecd..25bc62e62a 100644 --- a/src/query/src/test/java/org/apache/kylin/query/util/EscapeTransformerCalciteTest.java +++ b/src/query/src/test/java/org/apache/kylin/query/util/EscapeTransformerCalciteTest.java @@ -179,6 +179,28 @@ public class EscapeTransformerCalciteTest { Assert.assertEquals(expectedSQL, transformedSQL); } + @Test + public void ceilFloorDtTest() { + String originSQL = "select ceil_datetime('2012-02-02 00:23:23', 'year'), ceil(floor_datetime(col, 'hour') to day)"; + String expectedSQL = "select CEIL('2012-02-02 00:23:23' to year), CEIL(FLOOR(col to hour) to day)"; + + String transformedSQL = transformer.transform(originSQL); + Assert.assertEquals(expectedSQL, transformedSQL); + } + + @Test + public void testFailToTransformCeilFloorDt() { + { + String origin = "select ceil_datetime('2012-02-02 00:23:23')"; + Assert.assertEquals(origin, transformer.transform(origin)); + } + + { + String origin = "select floor_datetime('2012-02-02 00:23:23')"; + Assert.assertEquals(origin, transformer.transform(origin)); + } + } + @Test public void testCeilFloorQuery() { String originSql = "SELECT {FN WEEK(CEIL( FLOOR(\t TIME2 TO HOUR ) TO DAY )) }, FLOOR(SELLER_ID), CEIL(SELLER_ID) FROM TEST_MEASURE"; diff --git a/src/query/src/test/java/org/apache/kylin/query/util/EscapeTransformerSparkSqlTest.java b/src/query/src/test/java/org/apache/kylin/query/util/EscapeTransformerSparkSqlTest.java index 8523f0b608..ad9701d1f9 100644 --- a/src/query/src/test/java/org/apache/kylin/query/util/EscapeTransformerSparkSqlTest.java +++ b/src/query/src/test/java/org/apache/kylin/query/util/EscapeTransformerSparkSqlTest.java @@ -157,6 +157,17 @@ public class EscapeTransformerSparkSqlTest { Assert.assertEquals(expectedSQL, transformedSQL); } + @Test + public void ceilFloorDtTest() { + // The case ceil(floor_datetime(col, 'hour') to day) won't happen, + // so just give follow example to illustrate the normal case won't be replaced. + String originSQL = "select ceil_datetime('2012-02-02 00:23:23', 'year'), ceil_datetime(floor_datetime(col, 'hour'), 'day')"; + String expectedSQL = "select ceil_datetime('2012-02-02 00:23:23', 'year'), ceil_datetime(floor_datetime(col, 'hour'), 'day')"; + + String transformedSQL = transformer.transform(originSQL); + Assert.assertEquals(expectedSQL, transformedSQL); + } + @Test public void testCeilFloorQuery() { String originSql = "SELECT {FN WEEK(CEIL( FLOOR(\t TIME2 TO HOUR ) TO DAY )) }, FLOOR(SELLER_ID), CEIL(SELLER_ID) FROM TEST_MEASURE"; diff --git a/src/query/src/test/java/org/apache/kylin/query/util/PushDownUtilTest.java b/src/query/src/test/java/org/apache/kylin/query/util/PushDownUtilTest.java index 379af78e8e..5b91a8cef2 100644 --- a/src/query/src/test/java/org/apache/kylin/query/util/PushDownUtilTest.java +++ b/src/query/src/test/java/org/apache/kylin/query/util/PushDownUtilTest.java @@ -167,7 +167,7 @@ public class PushDownUtilTest extends NLocalFileMetadataTestCase { + "ON \"TEST_BANK_INCOME\".\"COUNTRY\" = \"TEST_BANK_LOCATION\".\"COUNTRY\"\n" // + "WHERE\n" // + "1 = 1"; - Assert.assertEquals(expected, PushDownUtil.generateFlatTableSql(model, project, false)); + Assert.assertEquals(expected, PushDownUtil.generateFlatTableSql(model, false)); } @Test @@ -204,7 +204,7 @@ public class PushDownUtilTest extends NLocalFileMetadataTestCase { + "WHERE\n" // + "1 = 1"; NDataModel updatedModel = modelManager.getDataModelDesc(model.getUuid()); - Assert.assertEquals(expected, PushDownUtil.generateFlatTableSql(updatedModel, project, false)); + Assert.assertEquals(expected, PushDownUtil.generateFlatTableSql(updatedModel, false)); } @@ -239,7 +239,7 @@ public class PushDownUtilTest extends NLocalFileMetadataTestCase { + "1 = 1\n" // + " AND (SUBSTRING(\"TEST_BANK_INCOME\".\"COUNTRY\", 0, 4) = 'china' and cc1 = 'china')"; NDataModel updatedModel = modelManager.getDataModelDesc(model.getUuid()); - Assert.assertEquals(expected, PushDownUtil.generateFlatTableSql(updatedModel, project, false)); + Assert.assertEquals(expected, PushDownUtil.generateFlatTableSql(updatedModel, false)); } @Test @@ -272,7 +272,7 @@ public class PushDownUtilTest extends NLocalFileMetadataTestCase { + "1 = 1\n" // + " AND (TIMESTAMPADD(day, 1, current_date) = '2012-01-01' and cc1 = 'china')"; NDataModel updatedModel = modelManager.getDataModelDesc(model.getUuid()); - Assert.assertEquals(expected, PushDownUtil.generateFlatTableSql(updatedModel, project, false)); + Assert.assertEquals(expected, PushDownUtil.generateFlatTableSql(updatedModel, false)); } private void updateModelToAddCC(String project, NDataModel model) { diff --git a/src/query/src/test/java/org/apache/kylin/query/util/QueryUtilTest.java b/src/query/src/test/java/org/apache/kylin/query/util/QueryUtilTest.java index 0b2120263f..022f457308 100644 --- a/src/query/src/test/java/org/apache/kylin/query/util/QueryUtilTest.java +++ b/src/query/src/test/java/org/apache/kylin/query/util/QueryUtilTest.java @@ -423,6 +423,16 @@ public class QueryUtilTest extends NLocalFileMetadataTestCase { newSql2); } + @Test + public void testAdaptCalciteSyntax() { + Assert.assertEquals("a\"b(", QueryUtil.adaptCalciteSyntax("a\"b(")); + Assert.assertEquals(" ", QueryUtil.adaptCalciteSyntax(" ")); + Assert.assertEquals("", QueryUtil.adaptCalciteSyntax("")); + Assert.assertEquals("CEIL(col to year)", QueryUtil.adaptCalciteSyntax("ceil_datetime(col, 'year')")); + Assert.assertEquals("CEIL(\"t\".\"col\" to year)", + QueryUtil.adaptCalciteSyntax("ceil_datetime(`t`.`col`, 'year')")); + } + @Test public void testLimitOffsetMatch() { KylinConfig config = KylinConfig.getInstanceFromEnv(); diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/CreateFlatTable.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/CreateFlatTable.scala index fdb4d87cd2..bdcd74ea3c 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/CreateFlatTable.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/CreateFlatTable.scala @@ -18,28 +18,27 @@ package org.apache.kylin.engine.spark.builder -import java.util -import com.google.common.collect.Sets -import org.apache.kylin.engine.spark.builder.DFBuilderHelper.{ENCODE_SUFFIX, _} +import java.util.Locale + +import org.apache.commons.lang3.StringUtils +import org.apache.kylin.engine.spark.builder.DFBuilderHelper._ import org.apache.kylin.engine.spark.job.NSparkCubingUtil._ import org.apache.kylin.engine.spark.job.{FlatTableHelper, TableMetaManager} import org.apache.kylin.engine.spark.utils.SparkDataSource._ import org.apache.kylin.engine.spark.utils.{LogEx, LogUtils} import org.apache.kylin.metadata.cube.cuboid.NSpanningTree import org.apache.kylin.metadata.cube.model.{NCubeJoinedFlatTableDesc, NDataSegment} -import org.apache.kylin.metadata.model.NDataModel -import org.apache.commons.lang3.StringUtils import org.apache.kylin.metadata.model._ import org.apache.spark.sql.functions.{col, expr} import org.apache.spark.sql.{Dataset, Row, SparkSession} -import java.util.Locale import scala.collection.JavaConverters._ import scala.collection.mutable -import scala.collection.mutable.ListBuffer import scala.collection.parallel.ForkJoinTaskSupport import scala.concurrent.forkjoin.ForkJoinPool +import com.google.common.collect.Sets + @Deprecated class CreateFlatTable(val flatTable: IJoinedFlatTableDesc, @@ -323,116 +322,6 @@ object CreateFlatTable extends LogEx { newdf } - /* - * Convert IJoinedFlatTableDesc to SQL statement - */ - def generateSelectDataStatement(flatDesc: IJoinedFlatTableDesc, - singleLine: Boolean, - skipAs: Array[String]): String = { - val sep: String = { - if (singleLine) " " - else "\n" - } - val skipAsList = { - if (skipAs == null) ListBuffer.empty[String] - else skipAs.toList - } - val sql: StringBuilder = new StringBuilder - sql.append("SELECT" + sep) - for (i <- 0 until flatDesc.getAllColumns.size()) { - val col: TblColRef = flatDesc.getAllColumns.get(i) - sql.append(",") - val colTotalName: String = - String.format(Locale.ROOT, "%s.%s", col.getTableRef.getTableName, col.getName) - if (skipAsList.contains(colTotalName)) { - sql.append(col.getExpressionInSourceDB + sep) - } else { - sql.append(col.getExpressionInSourceDB + " as " + colName(col) + sep) - } - } - appendJoinStatement(flatDesc, sql, singleLine) - appendWhereStatement(flatDesc, sql, singleLine) - sql.toString - } - - def appendJoinStatement(flatDesc: IJoinedFlatTableDesc, - sql: StringBuilder, - singleLine: Boolean): Unit = { - val sep: String = - if (singleLine) " " - else "\n" - val dimTableCache: util.Set[TableRef] = Sets.newHashSet[TableRef] - val model: NDataModel = flatDesc.getDataModel - val rootTable: TableRef = model.getRootFactTable - sql.append( - "FROM " + flatDesc.getDataModel.getRootFactTable.getTableIdentity + " as " + rootTable.getAlias + " " + sep) - for (lookupDesc <- model.getJoinTables.asScala) { - val join: JoinDesc = lookupDesc.getJoin - if (join != null && join.getType == "" == false) { - val joinType: String = join.getType.toUpperCase(Locale.ROOT) - val dimTable: TableRef = lookupDesc.getTableRef - if (!dimTableCache.contains(dimTable)) { - val pk: Array[TblColRef] = join.getPrimaryKeyColumns - val fk: Array[TblColRef] = join.getForeignKeyColumns - if (pk.length != fk.length) { - throw new RuntimeException( - "Invalid join condition of lookup table:" + lookupDesc) - } - sql.append( - joinType + " JOIN " + dimTable.getTableIdentity + " as " + dimTable.getAlias + sep) - sql.append("ON ") - var i: Int = 0 - while ( { - i < pk.length - }) { - if (i > 0) sql.append(" AND ") - sql.append( - fk(i).getExpressionInSourceDB + " = " + pk(i).getExpressionInSourceDB) - - { - i += 1 - i - 1 - } - } - sql.append(sep) - dimTableCache.add(dimTable) - } - } - } - } - - private def appendWhereStatement(flatDesc: IJoinedFlatTableDesc, - sql: StringBuilder, - singleLine: Boolean): Unit = { - val sep: String = - if (singleLine) " " - else "\n" - val whereBuilder: StringBuilder = new StringBuilder - whereBuilder.append("WHERE 1=1") - val model: NDataModel = flatDesc.getDataModel - if (StringUtils.isNotEmpty(model.getFilterCondition)) { - whereBuilder - .append(" AND (") - .append(model.getFilterCondition) - .append(") ") - } - val partDesc: PartitionDesc = model.getPartitionDesc - val segRange: SegmentRange[_ <: Comparable[_]] = flatDesc.getSegRange - if (flatDesc.getSegment != null && partDesc != null - && partDesc.getPartitionDateColumn != null && segRange != null && !segRange.isInfinite) { - val builder = - flatDesc.getDataModel.getPartitionDesc.getPartitionConditionBuilder - if (builder != null) { - whereBuilder.append(" AND (") - whereBuilder.append( - builder - .buildDateRangeCondition(partDesc, flatDesc.getSegment, segRange)) - whereBuilder.append(")" + sep) - } - - sql.append(whereBuilder.toString) - } - } def colName(col: TblColRef): String = { col.getTableAlias + "_" + col.getName diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/FlatTableHelper.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/FlatTableHelper.scala index 5964c661a6..3ba93d705e 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/FlatTableHelper.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/FlatTableHelper.scala @@ -21,7 +21,6 @@ package org.apache.kylin.engine.spark.job import org.apache.commons.lang3.StringUtils import org.apache.kylin.engine.spark.builder.CreateFlatTable.replaceDot import org.apache.kylin.metadata.model.IJoinedFlatTableDesc -import org.apache.kylin.query.util.PushDownUtil import org.apache.spark.internal.Logging import org.apache.spark.sql.{Dataset, Row} @@ -56,7 +55,6 @@ object FlatTableHelper extends Logging { if (StringUtils.isNotBlank(model.getFilterCondition)) { var filterCond = model.getFilterCondition - filterCond = PushDownUtil.massageExpression(model, model.getProject, filterCond, null); if (needReplaceDot) filterCond = replaceDot(filterCond, model) filterCond = s" (1=1) AND (" + filterCond + s")" logInfo(s"Filter condition is $filterCond") diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala index 0599fc95ae..93df9cf9a4 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala @@ -18,7 +18,9 @@ package org.apache.kylin.engine.spark.job.stage.build -import com.google.common.collect.Sets +import java.util.concurrent.{CountDownLatch, TimeUnit} +import java.util.{Locale, Objects, Timer, TimerTask} + import org.apache.commons.lang3.StringUtils import org.apache.hadoop.fs.Path import org.apache.kylin.common.util.HadoopUtil @@ -40,7 +42,6 @@ import org.apache.kylin.metadata.cube.cuboid.AdaptiveSpanningTree.AdaptiveTreeBu import org.apache.kylin.metadata.cube.model.NDataSegment import org.apache.kylin.metadata.cube.planner.CostBasePlannerUtils import org.apache.kylin.metadata.model._ -import org.apache.kylin.query.util.PushDownUtil import org.apache.spark.sql.KapFunctions.dict_encode_v3 import org.apache.spark.sql._ import org.apache.spark.sql.functions.{col, expr} @@ -64,6 +65,8 @@ import scala.concurrent.duration.{Duration, MILLISECONDS} import scala.concurrent.forkjoin.ForkJoinPool import scala.util.{Failure, Success, Try} +import com.google.common.collect.Sets + abstract class FlatTableAndDictBase(private val jobContext: SegmentJob, private val dataSegment: NDataSegment, private val buildParam: BuildParam) @@ -318,12 +321,12 @@ abstract class FlatTableAndDictBase(private val jobContext: SegmentJob, } private def applyFilterCondition(originDS: Dataset[Row]): Dataset[Row] = { - if (StringUtils.isBlank(dataModel.getFilterCondition)) { + val filterCondition = dataModel.getFilterCondition + if (StringUtils.isBlank(filterCondition)) { logInfo(s"No available FILTER-CONDITION segment $segmentId") return originDS } - val expression = PushDownUtil.massageExpression(dataModel, project, dataModel.getFilterCondition, null) - val converted = replaceDot(expression, dataModel) + val converted = replaceDot(filterCondition, dataModel) val condition = s" (1=1) AND ($converted)" logInfo(s"Apply FILTER-CONDITION: $condition segment $segmentId") originDS.where(condition) diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/smarter/IndexDependencyParser.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/smarter/IndexDependencyParser.scala index 04234294c9..cb8229e705 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/smarter/IndexDependencyParser.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/smarter/IndexDependencyParser.scala @@ -17,7 +17,9 @@ */ package org.apache.kylin.engine.spark.smarter -import com.google.common.collect.{Lists, Maps, Sets} +import java.util +import java.util.Collections + import org.apache.commons.collections.CollectionUtils import org.apache.commons.lang3.StringUtils import org.apache.kylin.engine.spark.job.NSparkCubingUtil @@ -29,11 +31,11 @@ import org.apache.spark.sql.execution.utils.SchemaProcessor import org.apache.spark.sql.types.StructField import org.apache.spark.sql.{Dataset, Row, SparderEnv, SparkSession} -import java.util -import java.util.Collections import scala.collection.JavaConverters._ import scala.collection.mutable +import com.google.common.collect.{Lists, Maps, Sets} + class IndexDependencyParser(val model: NDataModel) { private val ccTableNameAliasMap = Maps.newHashMap[String, util.Set[String]] @@ -95,7 +97,7 @@ class IndexDependencyParser(val model: NDataModel) { model.getEffectiveMeasures.get(id).getFunction.getParameters == null) } - private def getTableIdentitiesFromColumn(ref: TblColRef) = { + private def getTableIdentitiesFromColumn(ref: TblColRef): util.HashSet[String] = { val desc = ref.getColumnDesc if (desc.isComputedColumn) { Sets.newHashSet(ccTableNameAliasMap.get(ref.getName)) @@ -172,18 +174,20 @@ class IndexDependencyParser(val model: NDataModel) { result } - private def initFilterConditionTableNames(originDf: Dataset[Row], colFields: Array[StructField]): Unit = - if (StringUtils.isNotEmpty(model.getFilterCondition)) { - val whereDs = originDf.selectExpr(NSparkCubingUtil.convertFromDotWithBackTick(model.getFilterCondition.replace("\"", "`"))) - whereDs.schema.fields.foreach(whereField => { - colFields.foreach(colField => { - if (whereField.name.contains(colField.name)) { - val tableName = colField.name.substring(0, colField.name.indexOf(NSparkCubingUtil.SEPARATOR)) - allTablesAlias.add(model.getTableNameMap.get(tableName).getAlias) - } - }) - }) + private def initFilterConditionTableNames(originDf: Dataset[Row], colFields: Array[StructField]): Unit = { + if (StringUtils.isBlank(model.getFilterCondition)) { + return } + val whereDs = originDf.selectExpr(NSparkCubingUtil.convertFromDotWithBackTick(model.getFilterCondition)) + whereDs.schema.fields.foreach(whereField => { + colFields.foreach(colField => { + if (whereField.name.contains(colField.name)) { + val tableName = colField.name.substring(0, colField.name.indexOf(NSparkCubingUtil.SEPARATOR)) + allTablesAlias.add(model.getTableNameMap.get(tableName).getAlias) + } + }) + }) + } private def initPartitionColumnTableNames(): Unit = { if (model.getPartitionDesc != null && model.getPartitionDesc.getPartitionDateColumnRef != null) {
