This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit cb3f4e604f1caa8d13de7987912d5638d408bced
Author: Pengfei Zhan <[email protected]>
AuthorDate: Sat Mar 4 21:06:08 2023 +0800

    KYLIN-5523 [FOLLOW UP] expand the filter condition expression to ensure 
compatibility
---
 .../kylin/metadata/model/tool/CalciteParser.java   |  10 ++
 .../kylin/rest/service/ModelSemanticHelper.java    |  23 ++--
 .../apache/kylin/rest/service/ModelService.java    |  27 ++---
 .../service/ModelServiceSemanticUpdateTest.java    |  55 +++++++--
 .../kylin/rest/service/ModelServiceTest.java       |  18 +--
 .../org/apache/kylin/query/util/EscapeParser.jj    |  42 ++++++-
 .../org/apache/kylin/query/util/EscapeDialect.java |   4 +
 .../apache/kylin/query/util/EscapeFunction.java    |  16 +++
 .../org/apache/kylin/query/util/PushDownUtil.java  |  17 ++-
 .../org/apache/kylin/query/util/QueryUtil.java     |  15 +++
 .../query/util/EscapeTransformerCalciteTest.java   |  22 ++++
 .../query/util/EscapeTransformerSparkSqlTest.java  |  11 ++
 .../apache/kylin/query/util/PushDownUtilTest.java  |   8 +-
 .../org/apache/kylin/query/util/QueryUtilTest.java |  10 ++
 .../engine/spark/builder/CreateFlatTable.scala     | 123 +--------------------
 .../kylin/engine/spark/job/FlatTableHelper.scala   |   2 -
 .../job/stage/build/FlatTableAndDictBase.scala     |  13 ++-
 .../spark/smarter/IndexDependencyParser.scala      |  34 +++---
 18 files changed, 256 insertions(+), 194 deletions(-)

diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/tool/CalciteParser.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/tool/CalciteParser.java
index 16c6e85ba6..5a8f0fcf0e 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/tool/CalciteParser.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/tool/CalciteParser.java
@@ -29,11 +29,13 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.calcite.avatica.util.Casing;
 import org.apache.calcite.avatica.util.Quoting;
+import org.apache.calcite.config.NullCollation;
 import org.apache.calcite.sql.SqlDialect;
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlNodeList;
 import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.dialect.HiveSqlDialect;
 import org.apache.calcite.sql.parser.SqlParseException;
 import org.apache.calcite.sql.parser.SqlParser;
 import org.apache.calcite.sql.parser.SqlParserPos;
@@ -58,6 +60,14 @@ import lombok.extern.slf4j.Slf4j;
 @Slf4j
 public class CalciteParser {
 
+    /**
+     * Overwrite {@link HiveSqlDialect#DEFAULT} with backtick quote. 
+     */
+    public static final HiveSqlDialect HIVE_SQL_DIALECT = new HiveSqlDialect(
+            EMPTY_CONTEXT.withDatabaseProduct(SqlDialect.DatabaseProduct.HIVE) 
//
+                    .withNullCollation(NullCollation.LOW) //
+                    .withIdentifierQuoteString(Quoting.BACK_TICK.string));
+
     private CalciteParser() {
     }
 
diff --git 
a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelSemanticHelper.java
 
b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelSemanticHelper.java
index df14faaf5a..38307a881c 100644
--- 
a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelSemanticHelper.java
+++ 
b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelSemanticHelper.java
@@ -39,7 +39,6 @@ import java.util.stream.Collectors;
 
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.dialect.CalciteSqlDialect;
-import org.apache.calcite.sql.dialect.HiveSqlDialect;
 import org.apache.calcite.sql.util.SqlVisitor;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -97,6 +96,7 @@ import 
org.apache.kylin.metadata.model.util.scd2.SimplifiedJoinTableDesc;
 import org.apache.kylin.metadata.project.NProjectManager;
 import org.apache.kylin.metadata.recommendation.ref.OptRecManagerV2;
 import org.apache.kylin.query.util.PushDownUtil;
+import org.apache.kylin.query.util.QueryUtil;
 import org.apache.kylin.rest.request.ModelRequest;
 import org.apache.kylin.rest.response.BuildIndexResponse;
 import org.apache.kylin.rest.response.SimplifiedMeasure;
@@ -436,12 +436,12 @@ public class ModelSemanticHelper extends BasicService {
                     .forEach(x -> x.changeTableAlias(oldAliasName, 
newAliasName));
             model.getComputedColumnDescs().forEach(x -> changeTableAlias(x, 
oldAliasName, newAliasName));
 
-            String filterCondition = model.getFilterCondition();
-            if (StringUtils.isNotEmpty(filterCondition)) {
-                SqlVisitor<Object> modifyAlias = new 
ModifyTableNameSqlVisitor(oldAliasName, newAliasName);
-                SqlNode sqlNode = CalciteParser.getExpNode(filterCondition);
-                sqlNode.accept(modifyAlias);
-                String newFilterCondition = 
sqlNode.toSqlString(CalciteSqlDialect.DEFAULT, true).toString();
+            if (StringUtils.isNotBlank(model.getFilterCondition())) {
+                String expr = 
QueryUtil.adaptCalciteSyntax(model.getFilterCondition());
+                SqlNode sqlNode = CalciteParser.getExpNode(expr);
+                sqlNode.accept(new ModifyTableNameSqlVisitor(oldAliasName, 
newAliasName));
+
+                String newFilterCondition = 
sqlNode.toSqlString(CalciteParser.HIVE_SQL_DIALECT).toString();
                 model.setFilterCondition(newFilterCondition);
             }
         }
@@ -451,7 +451,7 @@ public class ModelSemanticHelper extends BasicService {
         SqlVisitor<Object> modifyAlias = new 
ModifyTableNameSqlVisitor(oldAlias, newAlias);
         SqlNode sqlNode = 
CalciteParser.getExpNode(computedColumnDesc.getExpression());
         sqlNode.accept(modifyAlias);
-        
computedColumnDesc.setExpression(sqlNode.toSqlString(HiveSqlDialect.DEFAULT).toString());
+        
computedColumnDesc.setExpression(sqlNode.toSqlString(CalciteSqlDialect.DEFAULT).toString());
     }
 
     private Map<String, String> getAliasTransformMap(NDataModel originModel, 
NDataModel expectModel) {
@@ -868,7 +868,7 @@ public class ModelSemanticHelper extends BasicService {
                 originModel.getEffectiveMeasures().keySet());
     }
 
-    public boolean isFilterConditonNotChange(String oldFilterCondition, String 
newFilterCondition) {
+    public boolean isFilterConditionNotChange(String oldFilterCondition, 
String newFilterCondition) {
         oldFilterCondition = oldFilterCondition == null ? "" : 
oldFilterCondition;
         newFilterCondition = newFilterCondition == null ? "" : 
newFilterCondition;
         return 
StringUtils.trim(oldFilterCondition).equals(StringUtils.trim(newFilterCondition));
@@ -903,7 +903,7 @@ public class ModelSemanticHelper extends BasicService {
         return isDifferent(originModel.getPartitionDesc(), 
newModel.getPartitionDesc())
                 || !Objects.equals(originModel.getRootFactTable(), 
newModel.getRootFactTable())
                 || 
!originModel.getJoinsGraph().match(newModel.getJoinsGraph(), Maps.newHashMap())
-                || 
!isFilterConditonNotChange(originModel.getFilterCondition(), 
newModel.getFilterCondition())
+                || 
!isFilterConditionNotChange(originModel.getFilterCondition(), 
newModel.getFilterCondition())
                 || 
!isMultiPartitionDescSame(originModel.getMultiPartitionDesc(), 
newModel.getMultiPartitionDesc())
                 || !isAntiFlattenableSame(originModel.getJoinTables(), 
newModel.getJoinTables());
     }
@@ -977,7 +977,6 @@ public class ModelSemanticHelper extends BasicService {
         return SourceFactory.getSource(tableDesc).getSegmentRange(start, end);
     }
 
-
     private void handleDatePartitionColumn(NDataModel newModel, 
NDataflowManager dataflowManager, NDataflow df,
             String modelId, String project, String start, String end) {
         // from having partition to no partition
@@ -986,7 +985,7 @@ public class ModelSemanticHelper extends BasicService {
                     
Lists.newArrayList(SegmentRange.TimePartitionedSegmentRange.createInfinite()));
             return;
         }
-            // change partition column and from no partition to having 
partition
+        // change partition column and from no partition to having partition
         if (StringUtils.isNotEmpty(start) && StringUtils.isNotEmpty(end)) {
             dataflowManager.fillDfManually(df,
                     Lists.newArrayList(getSegmentRangeByModel(project, 
modelId, start, end)));
diff --git 
a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java
 
b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java
index d0073de148..a7b83f9ceb 100644
--- 
a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java
+++ 
b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java
@@ -199,6 +199,7 @@ import 
org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.apache.kylin.metadata.streaming.KafkaConfig;
 import org.apache.kylin.query.util.PushDownUtil;
 import org.apache.kylin.query.util.QueryParams;
+import org.apache.kylin.query.util.QueryUtil;
 import org.apache.kylin.rest.aspect.Transaction;
 import org.apache.kylin.rest.constant.ModelStatusToDisplayEnum;
 import org.apache.kylin.rest.request.ModelConfigRequest;
@@ -1305,7 +1306,7 @@ public class ModelService extends AbstractModelService 
implements TableModelSupp
     public String getModelSql(String modelId, String project) {
         aclEvaluate.checkProjectReadPermission(project);
         NDataModel model = getManager(NDataModelManager.class, 
project).getDataModelDesc(modelId);
-        return PushDownUtil.generateFlatTableSql(model, project, false);
+        return PushDownUtil.generateFlatTableSql(model, false);
     }
 
     public List<RelatedModelResponse> getRelateModels(String project, String 
table, String modelId) {
@@ -1742,7 +1743,7 @@ public class ModelService extends AbstractModelService 
implements TableModelSupp
             if (prjInstance.getSourceType() == ISourceAware.ID_SPARK
                     && model.getModelType() == NDataModel.ModelType.BATCH) {
                 SparkSession ss = SparderEnv.getSparkSession();
-                String flatTableSql = PushDownUtil.generateFlatTableSql(model, 
project, false);
+                String flatTableSql = PushDownUtil.generateFlatTableSql(model, 
false);
                 QueryParams queryParams = new QueryParams(project, 
flatTableSql, "default", false);
                 queryParams.setKylinConfig(prjInstance.getConfig());
                 
queryParams.setAclInfo(AclPermissionUtil.createAclInfo(project, 
getCurrentUserGroups()));
@@ -3622,23 +3623,23 @@ public class ModelService extends AbstractModelService 
implements TableModelSupp
     }
 
     /**
-     * massage and update model filter condition
-     * 1. expand computed columns
-     * 2. add missing identifier quotes
-     * 3. add missing table identifiers
-     *
-     * @param model
+     * Convert model filter condition:
+     * 1. transform special function and backtick
+     * 2. use Calcite to add table name
+     * 3. massage for push-down
+     * 4. update the filter condition
      */
     void massageModelFilterCondition(final NDataModel model) {
-        if (StringUtils.isEmpty(model.getFilterCondition())) {
+        String filterCondition = model.getFilterCondition();
+        if (StringUtils.isBlank(filterCondition)) {
             return;
         }
 
-        String filterConditionWithTableName = 
addTableNameIfNotExist(model.getFilterCondition(), model);
+        filterCondition = QueryUtil.adaptCalciteSyntax(filterCondition);
+        String newFilterCondition = addTableNameIfNotExist(filterCondition, 
model);
         QueryContext.AclInfo aclInfo = 
AclPermissionUtil.createAclInfo(model.getProject(), getCurrentUserGroups());
-        // validate as soon as possible
-        PushDownUtil.massageExpression(model, model.getProject(), 
model.getFilterCondition(), aclInfo);
-        model.setFilterCondition(filterConditionWithTableName);
+        String massaged = PushDownUtil.massageExpression(model, 
model.getProject(), newFilterCondition, aclInfo);
+        model.setFilterCondition(massaged);
     }
 
     @VisibleForTesting
diff --git 
a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceSemanticUpdateTest.java
 
b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceSemanticUpdateTest.java
index 9fe9ea8aaf..b65770e577 100644
--- 
a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceSemanticUpdateTest.java
+++ 
b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceSemanticUpdateTest.java
@@ -21,6 +21,8 @@ import static 
org.apache.kylin.common.exception.code.ErrorCodeServer.SIMPLIFIED_
 import static org.hamcrest.Matchers.is;
 
 import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
@@ -38,6 +40,7 @@ import org.apache.kylin.common.exception.KylinException;
 import org.apache.kylin.common.persistence.transaction.UnitOfWork;
 import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
+import org.apache.kylin.common.util.Unsafe;
 import org.apache.kylin.cube.model.SelectRule;
 import org.apache.kylin.engine.spark.job.ExecutableAddCuboidHandler;
 import org.apache.kylin.engine.spark.job.NSparkCubingJob;
@@ -1659,17 +1662,47 @@ public class ModelServiceSemanticUpdateTest extends 
NLocalFileMetadataTestCase {
     }
 
     @Test
-    public void testIsFilterConditonNotChange() {
-        Assert.assertTrue(semanticService.isFilterConditonNotChange(null, 
null));
-        Assert.assertTrue(semanticService.isFilterConditonNotChange("", null));
-        Assert.assertTrue(semanticService.isFilterConditonNotChange(null, "    
"));
-        Assert.assertTrue(semanticService.isFilterConditonNotChange("  ", ""));
-        Assert.assertTrue(semanticService.isFilterConditonNotChange("", "      
   "));
-        Assert.assertTrue(semanticService.isFilterConditonNotChange("A=8", " 
A=8   "));
-
-        Assert.assertFalse(semanticService.isFilterConditonNotChange(null, 
"null"));
-        Assert.assertFalse(semanticService.isFilterConditonNotChange("", 
"null"));
-        Assert.assertFalse(semanticService.isFilterConditonNotChange("A=8", 
"A=9"));
+    public void testUpdateModelColumnForTableAliasModify()
+            throws NoSuchMethodException, InvocationTargetException, 
IllegalAccessException {
+        NDataModel testModel = getTestModel();
+        Map<String, String> map = Maps.newHashMap();
+        map.put("TEST_ORDER", "TEST_ORDER1");
+        testModel.setFilterCondition("`TEST_ORDER`.`ORDER_ID` > 1");
+        ModelSemanticHelper semanticHelper = new ModelSemanticHelper();
+        Class<? extends ModelSemanticHelper> clazz = semanticHelper.getClass();
+        Method method = 
clazz.getDeclaredMethod("updateModelColumnForTableAliasModify", 
NDataModel.class, Map.class);
+        Unsafe.changeAccessibleObject(method, true);
+        method.invoke(semanticHelper, testModel, map);
+        Assert.assertEquals("`TEST_ORDER1`.`ORDER_ID` > 1", 
testModel.getFilterCondition());
+        Unsafe.changeAccessibleObject(method, false);
+    }
+
+    @Test
+    public void testChangeTableAlias() throws NoSuchMethodException, 
InvocationTargetException, IllegalAccessException {
+        ComputedColumnDesc cc = new ComputedColumnDesc();
+        cc.setExpression("\"TEST_ORDER\".\"ORDER_ID\" + 1");
+        ModelSemanticHelper semanticHelper = new ModelSemanticHelper();
+        Class<? extends ModelSemanticHelper> clazz = semanticHelper.getClass();
+        Method method = clazz.getDeclaredMethod("changeTableAlias", 
ComputedColumnDesc.class, String.class,
+                String.class);
+        Unsafe.changeAccessibleObject(method, true);
+        method.invoke(semanticHelper, cc, "TEST_ORDER", "TEST_ORDER1");
+        Assert.assertEquals("\"TEST_ORDER1\".\"ORDER_ID\" + 1", 
cc.getExpression());
+        Unsafe.changeAccessibleObject(method, false);
+    }
+
+    @Test
+    public void testIsFilterConditionNotChange() {
+        Assert.assertTrue(semanticService.isFilterConditionNotChange(null, 
null));
+        Assert.assertTrue(semanticService.isFilterConditionNotChange("", 
null));
+        Assert.assertTrue(semanticService.isFilterConditionNotChange(null, "   
 "));
+        Assert.assertTrue(semanticService.isFilterConditionNotChange("  ", 
""));
+        Assert.assertTrue(semanticService.isFilterConditionNotChange("", "     
    "));
+        Assert.assertTrue(semanticService.isFilterConditionNotChange("A=8", " 
A=8   "));
+
+        Assert.assertFalse(semanticService.isFilterConditionNotChange(null, 
"null"));
+        Assert.assertFalse(semanticService.isFilterConditionNotChange("", 
"null"));
+        Assert.assertFalse(semanticService.isFilterConditionNotChange("A=8", 
"A=9"));
     }
 
     @Test
diff --git 
a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceTest.java
 
b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceTest.java
index 58971f625c..1e3e316bd8 100644
--- 
a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceTest.java
+++ 
b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceTest.java
@@ -3266,8 +3266,8 @@ public class ModelServiceTest extends SourceTestCase {
         modelRequest.getPartitionDesc().setPartitionDateFormat("yyyy-MM-dd");
 
         String filterCond = "trans_id = 0 and TEST_KYLIN_FACT.order_id < 100 
and DEAL_AMOUNT > 123";
-        String expectedFilterCond = "(((\"TEST_KYLIN_FACT\".\"TRANS_ID\" = 0) "
-                + "AND (\"TEST_KYLIN_FACT\".\"ORDER_ID\" < 100)) AND 
(\"TEST_KYLIN_FACT\".\"DEAL_AMOUNT\" > 123))";
+        String expectedFilterCond = "(((`TEST_KYLIN_FACT`.`TRANS_ID` = 0) AND 
(`TEST_KYLIN_FACT`.`ORDER_ID` < 100)) "
+                + "AND ((`TEST_KYLIN_FACT`.`PRICE` * 
`TEST_KYLIN_FACT`.`ITEM_COUNT`) > 123))";
         modelRequest.setFilterCondition(filterCond);
 
         val newModel = modelService.createModel(modelRequest.getProject(), 
modelRequest);
@@ -3291,7 +3291,7 @@ public class ModelServiceTest extends SourceTestCase {
         modelRequest.setUuid(null);
 
         String filterCond = "\"day\" = 0 and \"123TABLE\".\"day#\" = 1 and 
\"中文列\" = 1";
-        String expectedFilterCond = "(((\"123TABLE\".\"DAY\" = 0) AND 
(\"123TABLE\".\"day#\" = 1)) AND (\"123TABLE\".\"中文列\" = 1))";
+        String expectedFilterCond = "(((`123TABLE`.`DAY` = 0) AND 
(`123TABLE`.`day#` = 1)) AND (`123TABLE`.`中文列` = 1))";
         modelRequest.setFilterCondition(filterCond);
 
         val newModel = modelService.createModel(modelRequest.getProject(), 
modelRequest);
@@ -3540,8 +3540,9 @@ public class ModelServiceTest extends SourceTestCase {
         String originSql = "trans_id = 0 and TEST_KYLIN_FACT.order_id < 100 
and DEAL_AMOUNT > 123";
         model.setFilterCondition(originSql);
         modelService.massageModelFilterCondition(model);
-        Assert.assertEquals("(((\"TEST_KYLIN_FACT\".\"TRANS_ID\" = 0) AND 
(\"TEST_KYLIN_FACT\".\"ORDER_ID\" < 100)) "
-                + "AND (\"TEST_KYLIN_FACT\".\"DEAL_AMOUNT\" > 123))", 
model.getFilterCondition());
+        Assert.assertEquals("(((`TEST_KYLIN_FACT`.`TRANS_ID` = 0) "
+                + "AND (`TEST_KYLIN_FACT`.`ORDER_ID` < 100)) AND 
((`TEST_KYLIN_FACT`.`PRICE` * `TEST_KYLIN_FACT`.`ITEM_COUNT`) > 123))",
+                model.getFilterCondition());
     }
 
     @Test
@@ -3573,7 +3574,7 @@ public class ModelServiceTest extends SourceTestCase {
         final NDataModel model1 = modelManager.getDataModelDesc(modelId);
         model1.setFilterCondition("TIMESTAMPDIFF(DAY, CURRENT_DATE, 
TEST_KYLIN_FACT.\"CURRENT_DATE\") >= 0");
         modelService.massageModelFilterCondition(model1);
-        Assert.assertEquals("(TIMESTAMPDIFF(DAY, CURRENT_DATE(), 
\"TEST_KYLIN_FACT\".\"CURRENT_DATE\") >= 0)",
+        Assert.assertEquals("(TIMESTAMPDIFF('DAY', CURRENT_DATE(), 
`TEST_KYLIN_FACT`.`CURRENT_DATE`) >= 0)",
                 model1.getFilterCondition());
 
     }
@@ -3589,8 +3590,9 @@ public class ModelServiceTest extends SourceTestCase {
         String originSql = "trans_id = 0 and TEST_ORDER.order_id < 100 and 
DEAL_AMOUNT > 123";
         model.setFilterCondition(originSql);
         modelService.massageModelFilterCondition(model);
-        Assert.assertEquals("(((\"TEST_KYLIN_FACT\".\"TRANS_ID\" = 0) AND 
(\"TEST_ORDER\".\"ORDER_ID\" < 100)) "
-                + "AND (\"TEST_KYLIN_FACT\".\"DEAL_AMOUNT\" > 123))", 
model.getFilterCondition());
+        Assert.assertEquals("(((`TEST_KYLIN_FACT`.`TRANS_ID` = 0) "
+                + "AND (`TEST_ORDER`.`ORDER_ID` < 100)) AND 
((`TEST_KYLIN_FACT`.`PRICE` * `TEST_KYLIN_FACT`.`ITEM_COUNT`) > 123))",
+                model.getFilterCondition());
     }
 
     @Test
diff --git 
a/src/query-common/src/main/codegen/javacc/org/apache/kylin/query/util/EscapeParser.jj
 
b/src/query-common/src/main/codegen/javacc/org/apache/kylin/query/util/EscapeParser.jj
index d09a3cfb32..f253afc12c 100644
--- 
a/src/query-common/src/main/codegen/javacc/org/apache/kylin/query/util/EscapeParser.jj
+++ 
b/src/query-common/src/main/codegen/javacc/org/apache/kylin/query/util/EscapeParser.jj
@@ -13,7 +13,7 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Scanner;
 
-import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.StringUtils;
 
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
@@ -127,6 +127,8 @@ TOKEN :
 | < #EXPONENT: ["e","E"] (["+","-"])? (["0"-"9"])+ >
 | < DOT : "." >
 | < CEIL : "CEIL" >
+| < CEIL_DATETIME : "CEIL_DATETIME" >
+| < FLOOR_DATETIME : "FLOOR_DATETIME" >
 | < FLOOR : "FLOOR" >
 | < TO : "TO" >
 | < SUBSTRING : "SUBSTRING" >
@@ -237,6 +239,7 @@ String Expression() :
     | innerString = CastExpression()
     | innerString = ExtractExpression()
     | innerString = CeilFloorExpress()
+    | innerString = CeilFloorDatetimeExpress()
     | innerString = ParenExpress()
     | innerString = QuotedString()
     | innerString = DoubleQuotedString()
@@ -662,6 +665,42 @@ String CeilFloorExpress() :
     }
 }
 
+String CeilFloorDatetimeExpress() :
+{
+    String functionName;
+    List <String> parameters = Lists.newArrayList();
+    ImmutableSet<String> timeunitSet = ImmutableSet.of("YEAR", "QUARTER", 
"MONTH", "WEEK",
+                       "DAY", "HOUR", "MINUTE", "SECOND");
+}
+{
+    ( < CEIL_DATETIME> | < FLOOR_DATETIME> )
+    {
+        functionName = getToken(0).image;
+    }
+    (< SPACE >)* <LPAREN> (< SPACE >)*
+    {
+       parameters.add(ParameterExpression().trim());
+    }
+     (< SPACE >)*
+     (
+         <COMMA> (<SPACE>)*
+         {
+             String timeUnit = StringUtils.trim(ParameterExpression());
+             String unitToValidate = StringUtils.remove(timeUnit, '\'');
+             if (!timeunitSet.contains(StringUtils.upperCase(unitToValidate))) 
{
+                 throw new IllegalStateException(String.format(Locale.ROOT, 
CEIL_FLOOR_EXCEPTION_MSG,
+                     functionName, unitToValidate, String.join(", ", 
timeunitSet)));
+             }
+             parameters.add(timeUnit);
+         }
+         (<SPACE>)*
+     )?
+     <RPAREN>
+    {
+        return dialect.transformFN(functionName, parameters.toArray(new String 
[ 0 ]));
+    }
+}
+
 String PiFunction() :
 {
     String function;
@@ -766,6 +805,7 @@ String ParameterExpression() :
     | innerString = TsDiffOrAddExpression()
     | innerString = ExtractExpression()
     | innerString = CeilFloorExpress()
+    | innerString = CeilFloorDatetimeExpress()
     | innerString = Numeric()
     | innerString = Hint()
     | innerString = CubePriority()
diff --git 
a/src/query-common/src/main/java/org/apache/kylin/query/util/EscapeDialect.java 
b/src/query-common/src/main/java/org/apache/kylin/query/util/EscapeDialect.java
index 0e5a1b34c9..11561ea0aa 100644
--- 
a/src/query-common/src/main/java/org/apache/kylin/query/util/EscapeDialect.java
+++ 
b/src/query-common/src/main/java/org/apache/kylin/query/util/EscapeDialect.java
@@ -32,6 +32,8 @@ public abstract class EscapeDialect {
     private static final String FN_WEEK = "WEEK";
     private static final String FN_CEIL = "CEIL";
     private static final String FN_FLOOR = "FLOOR";
+    private static final String FN_CEIL_DT = "CEIL_DATETIME";
+    private static final String FN_FLOOR_DT = "FLOOR_DATETIME";
     private static final String FN_SUBSTR = "SUBSTR";
     private static final String FN_SUBSTRING = "SUBSTRING";
     private static final String FN_ASCII = "ASCII";
@@ -74,6 +76,8 @@ public abstract class EscapeDialect {
             register(FN_WEEK, FnConversion.WEEK_CALCITE);
             register(FN_CEIL, FnConversion.CEIL);
             register(FN_FLOOR, FnConversion.FLOOR);
+            register(FN_CEIL_DT, FnConversion.CEIL_DT);
+            register(FN_FLOOR_DT, FnConversion.FLOOR_DT);
             register(FN_SUBSTR, FnConversion.SUSTR);
             register(FN_SUBSTRING, FnConversion.SUSTRING);
 
diff --git 
a/src/query-common/src/main/java/org/apache/kylin/query/util/EscapeFunction.java
 
b/src/query-common/src/main/java/org/apache/kylin/query/util/EscapeFunction.java
index 09d8947a99..e297631bed 100644
--- 
a/src/query-common/src/main/java/org/apache/kylin/query/util/EscapeFunction.java
+++ 
b/src/query-common/src/main/java/org/apache/kylin/query/util/EscapeFunction.java
@@ -21,6 +21,8 @@ package org.apache.kylin.query.util;
 import java.util.List;
 import java.util.Locale;
 
+import org.apache.commons.lang3.StringUtils;
+
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
@@ -327,6 +329,20 @@ public class EscapeFunction {
             return normalFN("FLOOR", newArgs);
         }),
 
+        // such as: ceil_datetime(col, 'year') => ceil(col to year)
+        CEIL_DT(args -> {
+            Preconditions.checkArgument(args.length == 2, 
EscapeFunction.CEIL_EXCEPTION_MSG);
+            String[] newArgs = new String[] { args[0] + " to " + 
StringUtils.remove(args[1], '\'') };
+            return normalFN("CEIL", newArgs);
+        }),
+
+        // such as: floor_datetime(col, 'year') => floor(col to year)
+        FLOOR_DT(args -> {
+            Preconditions.checkArgument(args.length == 2, 
EscapeFunction.FLOOR_EXCEPTION_MSG);
+            String[] newArgs = new String[] { args[0] + " to " + 
StringUtils.remove(args[1], '\'') };
+            return normalFN("FLOOR", newArgs);
+        }),
+
         // tableau func
         SPACE(args -> {
             checkArgs(args, 1);
diff --git 
a/src/query-common/src/main/java/org/apache/kylin/query/util/PushDownUtil.java 
b/src/query-common/src/main/java/org/apache/kylin/query/util/PushDownUtil.java
index 4f7f467e17..60c02d73dc 100644
--- 
a/src/query-common/src/main/java/org/apache/kylin/query/util/PushDownUtil.java
+++ 
b/src/query-common/src/main/java/org/apache/kylin/query/util/PushDownUtil.java
@@ -173,7 +173,11 @@ public class PushDownUtil {
 
     public static String massageExpression(NDataModel model, String project, 
String expression,
             QueryContext.AclInfo aclInfo) {
-        String ccSql = expandComputedColumnExp(model, project, expression);
+        if (StringUtils.isBlank(expression)) {
+            return "";
+        }
+
+        String ccSql = expandComputedColumnExp(model, project, 
StringHelper.backtickToDoubleQuote(expression));
         QueryParams queryParams = new QueryParams(project, ccSql, 
PushDownUtil.DEFAULT_SCHEMA, false);
         queryParams.setKylinConfig(NProjectManager.getProjectConfig(project));
         queryParams.setAclInfo(aclInfo);
@@ -223,9 +227,10 @@ public class PushDownUtil {
     }
 
     /**
-     * This method is currently only used for verifying the flat-table 
generated by the model.
+     * Generate flat-table SQL which can be parsed by Apache Calcite. 
+     * If querying push-down is required, please use it in conjunction with 
{@link PushDownUtil#massagePushDownSql}.
      */
-    public static String generateFlatTableSql(NDataModel model, String 
project, boolean singleLine) {
+    public static String generateFlatTableSql(NDataModel model, boolean 
singleLine) {
         String sep = singleLine ? " " : "\n";
         StringBuilder sqlBuilder = new StringBuilder();
         sqlBuilder.append("SELECT ").append(sep);
@@ -237,7 +242,7 @@ public class PushDownUtil {
             String allColStr = tblColRefs.stream() //
                     .filter(colRef -> 
!colRef.getColumnDesc().isComputedColumn()) //
                     .map(colRef -> {
-                        String s = colRef.getTableAlias() + UNDER_LINE + 
colRef.getName();
+                        String s = colRef.getTableAlias() + 
PushDownUtil.UNDER_LINE + colRef.getName();
                         String colName = StringHelper.doubleQuote(s);
                         return colRef.getDoubleQuoteExp() + " as " + colName + 
sep;
                     }).collect(Collectors.joining(", "));
@@ -251,8 +256,8 @@ public class PushDownUtil {
         sqlBuilder.append("WHERE ").append(sep);
         sqlBuilder.append("1 = 1").append(sep);
         if (StringUtils.isNotEmpty(model.getFilterCondition())) {
-            massageExpression(model, project, model.getFilterCondition(), 
null);
-            sqlBuilder.append(" AND 
(").append(model.getFilterCondition()).append(") ").append(sep);
+            String filterConditionWithCalciteFormat = 
QueryUtil.adaptCalciteSyntax(model.getFilterCondition());
+            sqlBuilder.append(" AND 
(").append(filterConditionWithCalciteFormat).append(") ").append(sep);
         }
 
         return new EscapeTransformer().transform(sqlBuilder.toString());
diff --git 
a/src/query-common/src/main/java/org/apache/kylin/query/util/QueryUtil.java 
b/src/query-common/src/main/java/org/apache/kylin/query/util/QueryUtil.java
index 6459d29a10..77c7b2784d 100644
--- a/src/query-common/src/main/java/org/apache/kylin/query/util/QueryUtil.java
+++ b/src/query-common/src/main/java/org/apache/kylin/query/util/QueryUtil.java
@@ -34,6 +34,7 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.QueryContext;
 import org.apache.kylin.common.exception.KylinTimeoutException;
 import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.StringHelper;
 import org.apache.kylin.metadata.project.NProjectManager;
 import org.apache.kylin.metadata.query.BigQueryThresholdUpdater;
 import org.apache.kylin.query.IQueryTransformer;
@@ -60,6 +61,7 @@ public class QueryUtil {
     private static final String SEMI_COLON = ";";
     public static final String JDBC = "jdbc";
     private static final Map<String, IQueryTransformer> QUERY_TRANSFORMER_MAP 
= Maps.newConcurrentMap();
+    private static final EscapeTransformer ESCAPE_TRANSFORMER = new 
EscapeTransformer();
 
     static {
         String[] classNames = 
KylinConfig.getInstanceFromEnv().getQueryTransformers();
@@ -76,6 +78,19 @@ public class QueryUtil {
     private QueryUtil() {
     }
 
+    /**
+     * Convert special functions in the input string into the counterpart
+     * which can be parsed by Apache Calcite.
+     */
+    public static String adaptCalciteSyntax(String str) {
+        if (StringUtils.isBlank(str)) {
+            return str;
+        }
+        String transformed = ESCAPE_TRANSFORMER.transform(str);
+        transformed = StringHelper.backtickToDoubleQuote(transformed);
+        return transformed;
+    }
+
     public static boolean isSelectStatement(String sql) {
         String sql1 = sql.toLowerCase(Locale.ROOT);
         sql1 = removeCommentInSql(sql1);
diff --git 
a/src/query/src/test/java/org/apache/kylin/query/util/EscapeTransformerCalciteTest.java
 
b/src/query/src/test/java/org/apache/kylin/query/util/EscapeTransformerCalciteTest.java
index baefd0fecd..25bc62e62a 100644
--- 
a/src/query/src/test/java/org/apache/kylin/query/util/EscapeTransformerCalciteTest.java
+++ 
b/src/query/src/test/java/org/apache/kylin/query/util/EscapeTransformerCalciteTest.java
@@ -179,6 +179,28 @@ public class EscapeTransformerCalciteTest {
         Assert.assertEquals(expectedSQL, transformedSQL);
     }
 
+    @Test
+    public void ceilFloorDtTest() {
+        String originSQL = "select ceil_datetime('2012-02-02 00:23:23', 
'year'), ceil(floor_datetime(col, 'hour') to day)";
+        String expectedSQL = "select CEIL('2012-02-02 00:23:23' to year), 
CEIL(FLOOR(col to hour) to day)";
+
+        String transformedSQL = transformer.transform(originSQL);
+        Assert.assertEquals(expectedSQL, transformedSQL);
+    }
+
+    @Test
+    public void testFailToTransformCeilFloorDt() {
+        {
+            String origin = "select ceil_datetime('2012-02-02 00:23:23')";
+            Assert.assertEquals(origin, transformer.transform(origin));
+        }
+
+        {
+            String origin = "select floor_datetime('2012-02-02 00:23:23')";
+            Assert.assertEquals(origin, transformer.transform(origin));
+        }
+    }
+
     @Test
     public void testCeilFloorQuery() {
         String originSql = "SELECT {FN WEEK(CEIL( FLOOR(\t  TIME2 TO HOUR  ) 
TO DAY    )) }, FLOOR(SELLER_ID), CEIL(SELLER_ID) FROM TEST_MEASURE";
diff --git 
a/src/query/src/test/java/org/apache/kylin/query/util/EscapeTransformerSparkSqlTest.java
 
b/src/query/src/test/java/org/apache/kylin/query/util/EscapeTransformerSparkSqlTest.java
index 8523f0b608..ad9701d1f9 100644
--- 
a/src/query/src/test/java/org/apache/kylin/query/util/EscapeTransformerSparkSqlTest.java
+++ 
b/src/query/src/test/java/org/apache/kylin/query/util/EscapeTransformerSparkSqlTest.java
@@ -157,6 +157,17 @@ public class EscapeTransformerSparkSqlTest {
         Assert.assertEquals(expectedSQL, transformedSQL);
     }
 
+    @Test
+    public void ceilFloorDtTest() {
+        // The case ceil(floor_datetime(col, 'hour') to day) won't happen, 
+        // so just give follow example to illustrate the normal case won't be 
replaced.
+        String originSQL = "select ceil_datetime('2012-02-02 00:23:23',    
'year'), ceil_datetime(floor_datetime(col, 'hour'),  'day')";
+        String expectedSQL = "select ceil_datetime('2012-02-02 00:23:23', 
'year'), ceil_datetime(floor_datetime(col, 'hour'), 'day')";
+
+        String transformedSQL = transformer.transform(originSQL);
+        Assert.assertEquals(expectedSQL, transformedSQL);
+    }
+
     @Test
     public void testCeilFloorQuery() {
         String originSql = "SELECT {FN WEEK(CEIL( FLOOR(\t  TIME2 TO HOUR  ) 
TO DAY    )) }, FLOOR(SELLER_ID), CEIL(SELLER_ID) FROM TEST_MEASURE";
diff --git 
a/src/query/src/test/java/org/apache/kylin/query/util/PushDownUtilTest.java 
b/src/query/src/test/java/org/apache/kylin/query/util/PushDownUtilTest.java
index 379af78e8e..5b91a8cef2 100644
--- a/src/query/src/test/java/org/apache/kylin/query/util/PushDownUtilTest.java
+++ b/src/query/src/test/java/org/apache/kylin/query/util/PushDownUtilTest.java
@@ -167,7 +167,7 @@ public class PushDownUtilTest extends 
NLocalFileMetadataTestCase {
                 + "ON \"TEST_BANK_INCOME\".\"COUNTRY\" = 
\"TEST_BANK_LOCATION\".\"COUNTRY\"\n" //
                 + "WHERE\n" //
                 + "1 = 1";
-        Assert.assertEquals(expected, PushDownUtil.generateFlatTableSql(model, 
project, false));
+        Assert.assertEquals(expected, PushDownUtil.generateFlatTableSql(model, 
false));
     }
 
     @Test
@@ -204,7 +204,7 @@ public class PushDownUtilTest extends 
NLocalFileMetadataTestCase {
                 + "WHERE\n" //
                 + "1 = 1";
         NDataModel updatedModel = 
modelManager.getDataModelDesc(model.getUuid());
-        Assert.assertEquals(expected, 
PushDownUtil.generateFlatTableSql(updatedModel, project, false));
+        Assert.assertEquals(expected, 
PushDownUtil.generateFlatTableSql(updatedModel, false));
 
     }
 
@@ -239,7 +239,7 @@ public class PushDownUtilTest extends 
NLocalFileMetadataTestCase {
                 + "1 = 1\n" //
                 + " AND (SUBSTRING(\"TEST_BANK_INCOME\".\"COUNTRY\", 0, 4) = 
'china' and cc1 = 'china')";
         NDataModel updatedModel = 
modelManager.getDataModelDesc(model.getUuid());
-        Assert.assertEquals(expected, 
PushDownUtil.generateFlatTableSql(updatedModel, project, false));
+        Assert.assertEquals(expected, 
PushDownUtil.generateFlatTableSql(updatedModel, false));
     }
 
     @Test
@@ -272,7 +272,7 @@ public class PushDownUtilTest extends 
NLocalFileMetadataTestCase {
                 + "1 = 1\n" //
                 + " AND (TIMESTAMPADD(day, 1, current_date) = '2012-01-01' and 
cc1 = 'china')";
         NDataModel updatedModel = 
modelManager.getDataModelDesc(model.getUuid());
-        Assert.assertEquals(expected, 
PushDownUtil.generateFlatTableSql(updatedModel, project, false));
+        Assert.assertEquals(expected, 
PushDownUtil.generateFlatTableSql(updatedModel, false));
     }
 
     private void updateModelToAddCC(String project, NDataModel model) {
diff --git 
a/src/query/src/test/java/org/apache/kylin/query/util/QueryUtilTest.java 
b/src/query/src/test/java/org/apache/kylin/query/util/QueryUtilTest.java
index 0b2120263f..022f457308 100644
--- a/src/query/src/test/java/org/apache/kylin/query/util/QueryUtilTest.java
+++ b/src/query/src/test/java/org/apache/kylin/query/util/QueryUtilTest.java
@@ -423,6 +423,16 @@ public class QueryUtilTest extends 
NLocalFileMetadataTestCase {
                 newSql2);
     }
 
+    @Test
+    public void testAdaptCalciteSyntax() {
+        Assert.assertEquals("a\"b(", QueryUtil.adaptCalciteSyntax("a\"b("));
+        Assert.assertEquals("  ", QueryUtil.adaptCalciteSyntax("  "));
+        Assert.assertEquals("", QueryUtil.adaptCalciteSyntax(""));
+        Assert.assertEquals("CEIL(col to year)", 
QueryUtil.adaptCalciteSyntax("ceil_datetime(col, 'year')"));
+        Assert.assertEquals("CEIL(\"t\".\"col\" to year)",
+                QueryUtil.adaptCalciteSyntax("ceil_datetime(`t`.`col`, 
'year')"));
+    }
+
     @Test
     public void testLimitOffsetMatch() {
         KylinConfig config = KylinConfig.getInstanceFromEnv();
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/CreateFlatTable.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/CreateFlatTable.scala
index fdb4d87cd2..bdcd74ea3c 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/CreateFlatTable.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/CreateFlatTable.scala
@@ -18,28 +18,27 @@
 
 package org.apache.kylin.engine.spark.builder
 
-import java.util
-import com.google.common.collect.Sets
-import org.apache.kylin.engine.spark.builder.DFBuilderHelper.{ENCODE_SUFFIX, _}
+import java.util.Locale
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.kylin.engine.spark.builder.DFBuilderHelper._
 import org.apache.kylin.engine.spark.job.NSparkCubingUtil._
 import org.apache.kylin.engine.spark.job.{FlatTableHelper, TableMetaManager}
 import org.apache.kylin.engine.spark.utils.SparkDataSource._
 import org.apache.kylin.engine.spark.utils.{LogEx, LogUtils}
 import org.apache.kylin.metadata.cube.cuboid.NSpanningTree
 import org.apache.kylin.metadata.cube.model.{NCubeJoinedFlatTableDesc, 
NDataSegment}
-import org.apache.kylin.metadata.model.NDataModel
-import org.apache.commons.lang3.StringUtils
 import org.apache.kylin.metadata.model._
 import org.apache.spark.sql.functions.{col, expr}
 import org.apache.spark.sql.{Dataset, Row, SparkSession}
 
-import java.util.Locale
 import scala.collection.JavaConverters._
 import scala.collection.mutable
-import scala.collection.mutable.ListBuffer
 import scala.collection.parallel.ForkJoinTaskSupport
 import scala.concurrent.forkjoin.ForkJoinPool
 
+import com.google.common.collect.Sets
+
 
 @Deprecated
 class CreateFlatTable(val flatTable: IJoinedFlatTableDesc,
@@ -323,116 +322,6 @@ object CreateFlatTable extends LogEx {
     newdf
   }
 
-  /*
-   * Convert IJoinedFlatTableDesc to SQL statement
-   */
-  def generateSelectDataStatement(flatDesc: IJoinedFlatTableDesc,
-                                  singleLine: Boolean,
-                                  skipAs: Array[String]): String = {
-    val sep: String = {
-      if (singleLine) " "
-      else "\n"
-    }
-    val skipAsList = {
-      if (skipAs == null) ListBuffer.empty[String]
-      else skipAs.toList
-    }
-    val sql: StringBuilder = new StringBuilder
-    sql.append("SELECT" + sep)
-    for (i <- 0 until flatDesc.getAllColumns.size()) {
-      val col: TblColRef = flatDesc.getAllColumns.get(i)
-      sql.append(",")
-      val colTotalName: String =
-        String.format(Locale.ROOT, "%s.%s", col.getTableRef.getTableName, 
col.getName)
-      if (skipAsList.contains(colTotalName)) {
-        sql.append(col.getExpressionInSourceDB + sep)
-      } else {
-        sql.append(col.getExpressionInSourceDB + " as " + colName(col) + sep)
-      }
-    }
-    appendJoinStatement(flatDesc, sql, singleLine)
-    appendWhereStatement(flatDesc, sql, singleLine)
-    sql.toString
-  }
-
-  def appendJoinStatement(flatDesc: IJoinedFlatTableDesc,
-                          sql: StringBuilder,
-                          singleLine: Boolean): Unit = {
-    val sep: String =
-      if (singleLine) " "
-      else "\n"
-    val dimTableCache: util.Set[TableRef] = Sets.newHashSet[TableRef]
-    val model: NDataModel = flatDesc.getDataModel
-    val rootTable: TableRef = model.getRootFactTable
-    sql.append(
-      "FROM " + flatDesc.getDataModel.getRootFactTable.getTableIdentity + " as 
" + rootTable.getAlias + " " + sep)
-    for (lookupDesc <- model.getJoinTables.asScala) {
-      val join: JoinDesc = lookupDesc.getJoin
-      if (join != null && join.getType == "" == false) {
-        val joinType: String = join.getType.toUpperCase(Locale.ROOT)
-        val dimTable: TableRef = lookupDesc.getTableRef
-        if (!dimTableCache.contains(dimTable)) {
-          val pk: Array[TblColRef] = join.getPrimaryKeyColumns
-          val fk: Array[TblColRef] = join.getForeignKeyColumns
-          if (pk.length != fk.length) {
-            throw new RuntimeException(
-              "Invalid join condition of lookup table:" + lookupDesc)
-          }
-          sql.append(
-            joinType + " JOIN " + dimTable.getTableIdentity + " as " + 
dimTable.getAlias + sep)
-          sql.append("ON ")
-          var i: Int = 0
-          while ( {
-            i < pk.length
-          }) {
-            if (i > 0) sql.append(" AND ")
-            sql.append(
-              fk(i).getExpressionInSourceDB + " = " + 
pk(i).getExpressionInSourceDB)
-
-            {
-              i += 1
-              i - 1
-            }
-          }
-          sql.append(sep)
-          dimTableCache.add(dimTable)
-        }
-      }
-    }
-  }
-
-  private def appendWhereStatement(flatDesc: IJoinedFlatTableDesc,
-                                   sql: StringBuilder,
-                                   singleLine: Boolean): Unit = {
-    val sep: String =
-      if (singleLine) " "
-      else "\n"
-    val whereBuilder: StringBuilder = new StringBuilder
-    whereBuilder.append("WHERE 1=1")
-    val model: NDataModel = flatDesc.getDataModel
-    if (StringUtils.isNotEmpty(model.getFilterCondition)) {
-      whereBuilder
-        .append(" AND (")
-        .append(model.getFilterCondition)
-        .append(") ")
-    }
-    val partDesc: PartitionDesc = model.getPartitionDesc
-    val segRange: SegmentRange[_ <: Comparable[_]] = flatDesc.getSegRange
-    if (flatDesc.getSegment != null && partDesc != null
-      && partDesc.getPartitionDateColumn != null && segRange != null && 
!segRange.isInfinite) {
-      val builder =
-        flatDesc.getDataModel.getPartitionDesc.getPartitionConditionBuilder
-      if (builder != null) {
-        whereBuilder.append(" AND (")
-        whereBuilder.append(
-          builder
-            .buildDateRangeCondition(partDesc, flatDesc.getSegment, segRange))
-        whereBuilder.append(")" + sep)
-      }
-
-      sql.append(whereBuilder.toString)
-    }
-  }
 
   def colName(col: TblColRef): String = {
     col.getTableAlias + "_" + col.getName
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/FlatTableHelper.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/FlatTableHelper.scala
index 5964c661a6..3ba93d705e 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/FlatTableHelper.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/FlatTableHelper.scala
@@ -21,7 +21,6 @@ package org.apache.kylin.engine.spark.job
 import org.apache.commons.lang3.StringUtils
 import org.apache.kylin.engine.spark.builder.CreateFlatTable.replaceDot
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc
-import org.apache.kylin.query.util.PushDownUtil
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{Dataset, Row}
 
@@ -56,7 +55,6 @@ object FlatTableHelper extends Logging {
 
     if (StringUtils.isNotBlank(model.getFilterCondition)) {
       var filterCond = model.getFilterCondition
-      filterCond = PushDownUtil.massageExpression(model, model.getProject, 
filterCond, null);
       if (needReplaceDot) filterCond = replaceDot(filterCond, model)
       filterCond = s" (1=1) AND (" + filterCond + s")"
       logInfo(s"Filter condition is $filterCond")
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala
index 0599fc95ae..93df9cf9a4 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala
@@ -18,7 +18,9 @@
 
 package org.apache.kylin.engine.spark.job.stage.build
 
-import com.google.common.collect.Sets
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+import java.util.{Locale, Objects, Timer, TimerTask}
+
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.fs.Path
 import org.apache.kylin.common.util.HadoopUtil
@@ -40,7 +42,6 @@ import 
org.apache.kylin.metadata.cube.cuboid.AdaptiveSpanningTree.AdaptiveTreeBu
 import org.apache.kylin.metadata.cube.model.NDataSegment
 import org.apache.kylin.metadata.cube.planner.CostBasePlannerUtils
 import org.apache.kylin.metadata.model._
-import org.apache.kylin.query.util.PushDownUtil
 import org.apache.spark.sql.KapFunctions.dict_encode_v3
 import org.apache.spark.sql._
 import org.apache.spark.sql.functions.{col, expr}
@@ -64,6 +65,8 @@ import scala.concurrent.duration.{Duration, MILLISECONDS}
 import scala.concurrent.forkjoin.ForkJoinPool
 import scala.util.{Failure, Success, Try}
 
+import com.google.common.collect.Sets
+
 abstract class FlatTableAndDictBase(private val jobContext: SegmentJob,
                                     private val dataSegment: NDataSegment,
                                     private val buildParam: BuildParam)
@@ -318,12 +321,12 @@ abstract class FlatTableAndDictBase(private val 
jobContext: SegmentJob,
   }
 
   private def applyFilterCondition(originDS: Dataset[Row]): Dataset[Row] = {
-    if (StringUtils.isBlank(dataModel.getFilterCondition)) {
+    val filterCondition = dataModel.getFilterCondition
+    if (StringUtils.isBlank(filterCondition)) {
       logInfo(s"No available FILTER-CONDITION segment $segmentId")
       return originDS
     }
-    val expression = PushDownUtil.massageExpression(dataModel, project, 
dataModel.getFilterCondition, null)
-    val converted = replaceDot(expression, dataModel)
+    val converted = replaceDot(filterCondition, dataModel)
     val condition = s" (1=1) AND ($converted)"
     logInfo(s"Apply FILTER-CONDITION: $condition segment $segmentId")
     originDS.where(condition)
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/smarter/IndexDependencyParser.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/smarter/IndexDependencyParser.scala
index 04234294c9..cb8229e705 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/smarter/IndexDependencyParser.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/smarter/IndexDependencyParser.scala
@@ -17,7 +17,9 @@
  */
 package org.apache.kylin.engine.spark.smarter
 
-import com.google.common.collect.{Lists, Maps, Sets}
+import java.util
+import java.util.Collections
+
 import org.apache.commons.collections.CollectionUtils
 import org.apache.commons.lang3.StringUtils
 import org.apache.kylin.engine.spark.job.NSparkCubingUtil
@@ -29,11 +31,11 @@ import org.apache.spark.sql.execution.utils.SchemaProcessor
 import org.apache.spark.sql.types.StructField
 import org.apache.spark.sql.{Dataset, Row, SparderEnv, SparkSession}
 
-import java.util
-import java.util.Collections
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
+import com.google.common.collect.{Lists, Maps, Sets}
+
 class IndexDependencyParser(val model: NDataModel) {
 
   private val ccTableNameAliasMap = Maps.newHashMap[String, util.Set[String]]
@@ -95,7 +97,7 @@ class IndexDependencyParser(val model: NDataModel) {
       model.getEffectiveMeasures.get(id).getFunction.getParameters == null)
   }
 
-  private def getTableIdentitiesFromColumn(ref: TblColRef) = {
+  private def getTableIdentitiesFromColumn(ref: TblColRef): 
util.HashSet[String] = {
     val desc = ref.getColumnDesc
     if (desc.isComputedColumn) {
       Sets.newHashSet(ccTableNameAliasMap.get(ref.getName))
@@ -172,18 +174,20 @@ class IndexDependencyParser(val model: NDataModel) {
     result
   }
 
-  private def initFilterConditionTableNames(originDf: Dataset[Row], colFields: 
Array[StructField]): Unit =
-    if (StringUtils.isNotEmpty(model.getFilterCondition)) {
-      val whereDs = 
originDf.selectExpr(NSparkCubingUtil.convertFromDotWithBackTick(model.getFilterCondition.replace("\"",
 "`")))
-      whereDs.schema.fields.foreach(whereField => {
-        colFields.foreach(colField => {
-          if (whereField.name.contains(colField.name)) {
-            val tableName = colField.name.substring(0, 
colField.name.indexOf(NSparkCubingUtil.SEPARATOR))
-            allTablesAlias.add(model.getTableNameMap.get(tableName).getAlias)
-          }
-        })
-      })
+  private def initFilterConditionTableNames(originDf: Dataset[Row], colFields: 
Array[StructField]): Unit = {
+    if (StringUtils.isBlank(model.getFilterCondition)) {
+      return
     }
+    val whereDs = 
originDf.selectExpr(NSparkCubingUtil.convertFromDotWithBackTick(model.getFilterCondition))
+    whereDs.schema.fields.foreach(whereField => {
+      colFields.foreach(colField => {
+        if (whereField.name.contains(colField.name)) {
+          val tableName = colField.name.substring(0, 
colField.name.indexOf(NSparkCubingUtil.SEPARATOR))
+          allTablesAlias.add(model.getTableNameMap.get(tableName).getAlias)
+        }
+      })
+    })
+  }
 
   private def initPartitionColumnTableNames(): Unit = {
     if (model.getPartitionDesc != null && 
model.getPartitionDesc.getPartitionDateColumnRef != null) {

Reply via email to