This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 42849e9e9470df939e2f49dd88acb4c680956093 Author: fanfanAlice <41991994+fanfanal...@users.noreply.github.com> AuthorDate: Mon Oct 31 20:37:56 2022 +0800 KYLIN-5385 add bigquery pushdown Co-authored-by: fanfanAlice <18611532...@163.com> --- .../org/apache/kylin/common/KylinConfigBase.java | 4 + .../org/apache/kylin/query/util/KapQueryUtil.java | 27 ++++-- .../kylin/query/engine/QueryRoutingEngine.java | 18 +++- .../kylin/query/engine/QueryRoutingEngineTest.java | 54 +++++++++++- .../org/apache/kylin/query/util/QueryUtilTest.java | 95 +++++++++++++++++++++- 5 files changed, 187 insertions(+), 11 deletions(-) diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index c917c96e7c..64606df279 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -3547,6 +3547,10 @@ public abstract class KylinConfigBase implements Serializable { return Integer.parseInt(this.getOptional("kylin.query.max-result-rows", "0")); } + public boolean isBigQueryPushDown() { + return Boolean.parseBoolean(this.getOptional("kylin.query.big-query-pushdown", FALSE)); + } + public Integer getLoadHiveTableWaitSparderSeconds() { return Integer.parseInt(this.getOptional("kylin.source.load-hive-table-wait-sparder-seconds", "900")); } diff --git a/src/query-common/src/main/java/org/apache/kylin/query/util/KapQueryUtil.java b/src/query-common/src/main/java/org/apache/kylin/query/util/KapQueryUtil.java index 9f66b2208a..e3226c8979 100644 --- a/src/query-common/src/main/java/org/apache/kylin/query/util/KapQueryUtil.java +++ b/src/query-common/src/main/java/org/apache/kylin/query/util/KapQueryUtil.java @@ -42,6 +42,7 @@ import org.apache.calcite.sql.SqlOrderBy; import org.apache.calcite.sql.SqlSelect; import org.apache.calcite.util.Util; import org.apache.commons.lang.StringUtils; +import org.apache.kylin.common.KapConfig; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.QueryContext; import org.apache.kylin.common.exception.KylinTimeoutException; @@ -55,6 +56,7 @@ import org.apache.kylin.metadata.model.TableRef; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.metadata.project.NProjectManager; import org.apache.kylin.metadata.project.ProjectInstance; +import org.apache.kylin.metadata.query.BigQueryThresholdUpdater; import org.apache.kylin.query.SlowQueryDetector; import org.apache.kylin.query.exception.UserStopQueryException; import org.apache.kylin.query.relnode.KapJoinRel; @@ -75,6 +77,8 @@ public class KapQueryUtil { public static final String DEFAULT_SCHEMA = "DEFAULT"; public static final ImmutableSet<String> REMOVED_TRANSFORMERS = ImmutableSet.of("ReplaceStringWithVarchar"); + public static final String JDBC = "jdbc"; + public static List<IQueryTransformer> queryTransformers = Collections.emptyList(); public static List<IPushDownConverter> pushDownConverters = Collections.emptyList(); @@ -338,6 +342,20 @@ public class KapQueryUtil { limit = maxRows; } + // https://issues.apache.org/jira/browse/KYLIN-2649 + if (kylinConfig.getForceLimit() > 0 && limit <=0 && !sql.toLowerCase(Locale.ROOT).contains("limit") + && sql.toLowerCase(Locale.ROOT).matches("^select\\s+\\*\\p{all}*")) { + limit = kylinConfig.getForceLimit(); + } + + if (checkBigQueryPushDown(kylinConfig)) { + long bigQueryThreshold = BigQueryThresholdUpdater.getBigQueryThreshold(); + if (limit <=0 && bigQueryThreshold > 0) { + log.info("Big query route to pushdown, Add limit {} to sql.", bigQueryThreshold); + limit = (int) bigQueryThreshold; + } + } + if (limit > 0 && !sqlElements.contains("limit")) { sql += ("\nLIMIT " + limit); } @@ -346,14 +364,13 @@ public class KapQueryUtil { sql += ("\nOFFSET " + offset); } - // https://issues.apache.org/jira/browse/KYLIN-2649 - if (kylinConfig.getForceLimit() > 0 && !sql.toLowerCase(Locale.ROOT).contains("limit") - && sql.toLowerCase(Locale.ROOT).matches("^select\\s+\\*\\p{all}*")) { - sql += ("\nLIMIT " + kylinConfig.getForceLimit()); - } return sql; } + public static boolean checkBigQueryPushDown(KylinConfig kylinConfig) { + return kylinConfig.isBigQueryPushDown() && JDBC.equals(KapConfig.getInstanceFromEnv().getShareStateSwitchImplement()); + } + public static void initQueryTransformersIfNeeded(KylinConfig kylinConfig, boolean isCCNeeded) { String[] currentTransformers = queryTransformers.stream().map(Object::getClass).map(Class::getCanonicalName) .toArray(String[]::new); diff --git a/src/query/src/main/java/org/apache/kylin/query/engine/QueryRoutingEngine.java b/src/query/src/main/java/org/apache/kylin/query/engine/QueryRoutingEngine.java index 0acea31fb8..a7e85c9056 100644 --- a/src/query/src/main/java/org/apache/kylin/query/engine/QueryRoutingEngine.java +++ b/src/query/src/main/java/org/apache/kylin/query/engine/QueryRoutingEngine.java @@ -141,7 +141,11 @@ public class QueryRoutingEngine { NProjectLoader.removeCache(); return queryWithSqlMassage(queryParams); } else { - throw e; + if (e.getCause() instanceof NewQueryRefuseException && shouldPushdown(e, queryParams)) { + return pushDownQuery(e, queryParams); + } else { + throw e; + } } } if (shouldPushdown(e, queryParams)) { @@ -179,7 +183,7 @@ public class QueryRoutingEngine { } if (e.getCause() instanceof NewQueryRefuseException) { - return false; + return checkBigQueryPushDown(queryParams); } return e instanceof SQLException && !e.getMessage().contains(SPARK_MEM_LIMIT_EXCEEDED); @@ -210,6 +214,16 @@ public class QueryRoutingEngine { return queryResult; } + private boolean checkBigQueryPushDown(QueryParams queryParams) { + KylinConfig kylinConfig = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv()) + .getProject(queryParams.getProject()).getConfig(); + boolean isPush = KapQueryUtil.checkBigQueryPushDown(kylinConfig); + if (isPush) { + logger.info("Big query route to pushdown."); + } + return isPush; + } + private QueryResult pushDownQuery(SQLException sqlException, QueryParams queryParams) throws SQLException { QueryContext.current().getMetrics().setOlapCause(sqlException); QueryContext.current().getQueryTagInfo().setPushdown(true); diff --git a/src/query/src/test/java/org/apache/kylin/query/engine/QueryRoutingEngineTest.java b/src/query/src/test/java/org/apache/kylin/query/engine/QueryRoutingEngineTest.java index 708a8e3630..f929d50a39 100644 --- a/src/query/src/test/java/org/apache/kylin/query/engine/QueryRoutingEngineTest.java +++ b/src/query/src/test/java/org/apache/kylin/query/engine/QueryRoutingEngineTest.java @@ -29,16 +29,18 @@ import java.sql.Timestamp; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.QueryContext; import org.apache.kylin.common.exception.KylinException; +import org.apache.kylin.common.exception.NewQueryRefuseException; import org.apache.kylin.common.exception.QueryErrorCode; import org.apache.kylin.common.exception.TargetSegmentNotFoundException; import org.apache.kylin.common.persistence.InMemResourceStore; import org.apache.kylin.common.persistence.ResourceStore; -import org.apache.kylin.metadata.realization.NoStreamingRealizationFoundException; -import org.apache.kylin.query.util.QueryParams; -import org.apache.kylin.source.adhocquery.PushdownResult; import org.apache.kylin.common.persistence.transaction.TransactionException; import org.apache.kylin.common.util.NLocalFileMetadataTestCase; +import org.apache.kylin.metadata.realization.NoStreamingRealizationFoundException; import org.apache.kylin.query.QueryExtension; +import org.apache.kylin.query.engine.data.QueryResult; +import org.apache.kylin.query.util.QueryParams; +import org.apache.kylin.source.adhocquery.PushdownResult; import org.apache.spark.SparkException; import org.junit.After; import org.junit.Assert; @@ -256,4 +258,50 @@ public class QueryRoutingEngineTest extends NLocalFileMetadataTestCase { QueryContext.current().getMetrics().setRetryTimes(0); } + @Test + public void testNewQueryRefuseException() throws Exception { + final String sql = "select * from success_table_2"; + final String project = "default"; + KylinConfig kylinconfig = KylinConfig.getInstanceFromEnv(); + QueryParams queryParams = new QueryParams(); + queryParams.setProject(project); + queryParams.setSql(sql); + queryParams.setKylinConfig(kylinconfig); + queryParams.setSelect(true); + + Mockito.doThrow(new SQLException("", + new NewQueryRefuseException("Refuse new big query, sum of source_scan_rows is 10, " + + "refuse query threshold is 10. Current step: Collecting dataset for sparder. "))) + .when(queryRoutingEngine).execute(Mockito.anyString(), Mockito.any()); + + try { + queryRoutingEngine.queryWithSqlMassage(queryParams); + } catch (Exception e) { + Assert.assertTrue(e.getCause() instanceof NewQueryRefuseException); + Assert.assertFalse(QueryContext.current().getQueryTagInfo().isPushdown()); + } + + kylinconfig.setProperty("kylin.query.share-state-switch-implement", "jdbc"); + kylinconfig.setProperty("kylin.query.big-query-source-scan-rows-threshold", "10"); + kylinconfig.setProperty("kylin.query.big-query-pushdown", "true"); + queryParams.setKylinConfig(kylinconfig); + + Mockito.doThrow(new SQLException("", + new NewQueryRefuseException("Refuse new big query, sum of source_scan_rows is 10, " + + "refuse query threshold is 10. Current step: Collecting dataset for sparder. "))) + .when(queryRoutingEngine).execute(Mockito.anyString(), Mockito.any()); + try { + queryRoutingEngine.queryWithSqlMassage(queryParams); + } catch (Exception e) { + Assert.assertTrue(e.getCause() instanceof NewQueryRefuseException); + Assert.assertTrue(QueryContext.current().getQueryTagInfo().isPushdown()); + } + Mockito.doAnswer(invocation -> { + pushdownCount++; + Assert.assertTrue(ResourceStore.getKylinMetaStore(kylinconfig) instanceof InMemResourceStore); + return PushdownResult.emptyResult(); + }).when(queryRoutingEngine).tryPushDownSelectQuery(Mockito.any(), Mockito.any(), Mockito.anyBoolean()); + QueryResult queryResult = queryRoutingEngine.queryWithSqlMassage(queryParams); + Assert.assertEquals(0, queryResult.getSize()); + } } diff --git a/src/query/src/test/java/org/apache/kylin/query/util/QueryUtilTest.java b/src/query/src/test/java/org/apache/kylin/query/util/QueryUtilTest.java index 203eac75b4..cca7fca458 100644 --- a/src/query/src/test/java/org/apache/kylin/query/util/QueryUtilTest.java +++ b/src/query/src/test/java/org/apache/kylin/query/util/QueryUtilTest.java @@ -24,16 +24,18 @@ import java.util.Properties; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.KylinConfig.SetAndUnsetThreadLocalConfig; -import org.apache.kylin.query.security.AccessDeniedException; import org.apache.kylin.common.util.NLocalFileMetadataTestCase; import org.apache.kylin.metadata.model.ComputedColumnDesc; import org.apache.kylin.metadata.model.NDataModelManager; +import org.apache.kylin.query.security.AccessDeniedException; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import io.kyligence.kap.query.util.KapQueryUtil; + public class QueryUtilTest extends NLocalFileMetadataTestCase { @Before @@ -426,4 +428,95 @@ public class QueryUtilTest extends NLocalFileMetadataTestCase { Assert.assertEquals(expectedSql, massagedSql); } } + + @Test + public void testBigQueryPushDown() { + KylinConfig config = KylinConfig.getInstanceFromEnv(); + config.setProperty("kylin.query.share-state-switch-implement", "jdbc"); + config.setProperty("kylin.query.big-query-source-scan-rows-threshold", "10"); + config.setProperty("kylin.query.big-query-pushdown", "true"); + String sql1 = "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID"; + QueryParams queryParams1 = new QueryParams(config, sql1, "default", 0, 0, "DEFAULT", true); + String newSql1 = KapQueryUtil.massageSql(queryParams1); + Assert.assertEquals( + "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID\n" + + "LIMIT 10", + newSql1); + } + + @Test + public void testBigQueryPushDownByParams() { + KylinConfig config = KylinConfig.createKylinConfig(new Properties()); + String sql1 = "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID"; + QueryParams queryParams1 = new QueryParams(config, sql1, "default", 0, 0, "DEFAULT", true); + String newSql1 = KapQueryUtil.massageSql(queryParams1); + Assert.assertEquals( + "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID", + newSql1); + String sql = "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID"; + QueryParams queryParams = new QueryParams(config, sql, "default", 0, 0, "DEFAULT", true); + String targetSQL = KapQueryUtil.massageSql(queryParams); + Assert.assertEquals( + "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID", + targetSQL); + queryParams = new QueryParams(config, sql, "default", 1, 0, "DEFAULT", true); + targetSQL = KapQueryUtil.massageSql(queryParams); + Assert.assertEquals( + "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID\n" + + "LIMIT 1", + targetSQL); + config.setProperty("kylin.query.max-result-rows", "2"); + queryParams = new QueryParams(config, sql, "default", 0, 0, "DEFAULT", true); + targetSQL = KapQueryUtil.massageSql(queryParams); + Assert.assertEquals( + "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID\n" + + "LIMIT 2", + targetSQL); + queryParams = new QueryParams(config, sql, "default", 1, 0, "DEFAULT", true); + targetSQL = KapQueryUtil.massageSql(queryParams); + Assert.assertEquals( + "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID\n" + + "LIMIT 1", + targetSQL); + config.setProperty("kylin.query.max-result-rows", "-1"); + config.setProperty("kylin.query.force-limit", "3"); + queryParams = new QueryParams(config, sql, "default", 0, 0, "DEFAULT", true); + targetSQL = KapQueryUtil.massageSql(queryParams); + Assert.assertEquals( + "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID", + targetSQL); + queryParams = new QueryParams(config, sql, "default", 1, 0, "DEFAULT", true); + targetSQL = KapQueryUtil.massageSql(queryParams); + Assert.assertEquals( + "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID\n" + + "LIMIT 1", + targetSQL); + sql1 = "select * from table1"; + queryParams = new QueryParams(config, sql1, "default", 0, 0, "DEFAULT", true); + targetSQL = KapQueryUtil.massageSql(queryParams); + Assert.assertEquals("select * from table1" + "\n" + "LIMIT 3", targetSQL); + queryParams = new QueryParams(config, sql1, "default", 2, 0, "DEFAULT", true); + targetSQL = KapQueryUtil.massageSql(queryParams); + Assert.assertEquals("select * from table1" + "\n" + "LIMIT 2", targetSQL); + sql1 = "select * from table1 limit 4"; + queryParams = new QueryParams(config, sql1, "default", 0, 0, "DEFAULT", true); + targetSQL = KapQueryUtil.massageSql(queryParams); + Assert.assertEquals("select * from table1 limit 4", targetSQL); + queryParams = new QueryParams(config, sql1, "default", 2, 0, "DEFAULT", true); + targetSQL = KapQueryUtil.massageSql(queryParams); + Assert.assertEquals("select * from table1 limit 4", targetSQL); + config.setProperty("kylin.query.force-limit", "-1"); + config.setProperty("kylin.query.share-state-switch-implement", "jdbc"); + queryParams = new QueryParams(config, sql, "default", 0, 0, "DEFAULT", true); + targetSQL = KapQueryUtil.massageSql(queryParams); + Assert.assertEquals( + "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID", + targetSQL); + config.setProperty("kylin.query.big-query-pushdown", "true"); + queryParams = new QueryParams(config, sql, "default", 0, 0, "DEFAULT", true); + targetSQL = KapQueryUtil.massageSql(queryParams); + Assert.assertEquals( + "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID", + targetSQL); + } }