This is an automated email from the ASF dual-hosted git repository. caolu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin5 by this push: new d0433ae8cb KYLIN-6063 Fix KylinStorageScanExec fallback d0433ae8cb is described below commit d0433ae8cbeb1860592c38fc9cbaa42303342558 Author: Shuai li <loney...@live.cn> AuthorDate: Tue Jan 7 10:13:07 2025 +0800 KYLIN-6063 Fix KylinStorageScanExec fallback --- .../apache/kylin/gluten/GlutenKEConfigTest.java | 11 +++++ .../apache/kylin/util/QueryResultComparator.java | 2 +- .../query/sql_window/new_sql_window/query20.sql | 10 ++--- .../test/resources/query/sql_window/query01.sql | 4 +- .../test/resources/query/sql_window/query02.sql | 3 +- .../test/resources/query/sql_window/query03.sql | 7 +-- .../test/resources/query/sql_window/query04.sql | 3 +- .../test/resources/query/sql_window/query05.sql | 1 + .../test/resources/query/sql_window/query06.sql | 1 + .../test/resources/query/sql_window/query07.sql | 1 + .../test/resources/query/sql_window/query08.sql | 1 + .../query/sql_window/query08.sql.expected | 1 + .../test/resources/query/sql_window/query09.sql | 1 + .../query/sql_window/query09.sql.expected | 1 + .../test/resources/query/sql_window/query12.sql | 1 + .../query/sql_window/query12.sql.expected | 1 + .../test/resources/query/sql_window/query13.sql | 1 + .../spark/NLocalWithSparkSessionTestBase.java | 2 + src/spark-project/kylin-internal-catalog/pom.xml | 6 +++ .../scala/org/apache/spark/sql/SparderEnv.scala | 2 +- src/spark-project/spark-common/pom.xml | 6 +++ .../datasource/KylinDeltaSourceStrategy.scala | 2 +- .../ConvertKylinFileSourceToGlutenRule.scala | 16 ++++++- .../gluten/KylinStorageScanExecTransformer.scala | 47 ++++++++++++++++++++ .../apache/spark/sql/common/GlutenTestConfig.scala | 14 +++++- .../execution/KylinFileSourceScanExecSuite.scala | 51 ++++++++++++++++++++-- 26 files changed, 176 insertions(+), 20 deletions(-) diff --git a/src/kylin-it/src/test/java/org/apache/kylin/gluten/GlutenKEConfigTest.java b/src/kylin-it/src/test/java/org/apache/kylin/gluten/GlutenKEConfigTest.java index bb414d2323..400b99c418 100644 --- a/src/kylin-it/src/test/java/org/apache/kylin/gluten/GlutenKEConfigTest.java +++ b/src/kylin-it/src/test/java/org/apache/kylin/gluten/GlutenKEConfigTest.java @@ -18,10 +18,12 @@ package org.apache.kylin.gluten; +import org.apache.gluten.execution.FilterExecTransformer; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.query.engine.QueryExec; import org.apache.kylin.rec.util.AccelerationUtil; import org.apache.kylin.util.SuggestTestBase; +import org.apache.spark.sql.common.GlutenTestConfig; import org.junit.Assert; import org.junit.Test; @@ -41,6 +43,15 @@ public class GlutenKEConfigTest extends SuggestTestBase { Assert.assertTrue(resultSet.getSize() > 0); } + @Test + public void testGlutenIsEnable() throws Exception { + if (!GlutenTestConfig.enableGluten()) { + return; + } + val a = ss.range(100).filter("id < 5").queryExecution().executedPlan(); + Assert.assertTrue(a.find(FilterExecTransformer.class::isInstance).nonEmpty()); + } + private void proposeAndBuildIndex(String[] sqls) throws InterruptedException { AccelerationUtil.runWithSmartContext(KylinConfig.getInstanceFromEnv(), PROJECT, sqls, true); buildAllModels(KylinConfig.getInstanceFromEnv(), PROJECT); diff --git a/src/kylin-it/src/test/java/org/apache/kylin/util/QueryResultComparator.java b/src/kylin-it/src/test/java/org/apache/kylin/util/QueryResultComparator.java index 4fe6a5840b..0c4c6f4d74 100644 --- a/src/kylin-it/src/test/java/org/apache/kylin/util/QueryResultComparator.java +++ b/src/kylin-it/src/test/java/org/apache/kylin/util/QueryResultComparator.java @@ -129,7 +129,7 @@ public class QueryResultComparator { private static void printRows(String source, List<String> rows) { log.info("***********" + source + " start, only show top 100 result**********"); - rows.stream().limit(100).forEach(log::info); + rows.stream().limit(11000).forEach(log::info); log.info("***********" + source + " end**********"); } } diff --git a/src/kylin-it/src/test/resources/query/sql_window/new_sql_window/query20.sql b/src/kylin-it/src/test/resources/query/sql_window/new_sql_window/query20.sql index bec5dcabaf..91566ae288 100644 --- a/src/kylin-it/src/test/resources/query/sql_window/new_sql_window/query20.sql +++ b/src/kylin-it/src/test/resources/query/sql_window/new_sql_window/query20.sql @@ -22,9 +22,9 @@ -- OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -- select -lag(cal_dt,50,date'2019-01-01') over (partition by seller_id order by item_count) -,lag(cal_dt,50,'2019-01-01') over (partition by seller_id order by item_count) -,lead(item_count,1,'2019') over (partition by seller_id order by cal_dt) -,lead(item_count,1,2019) over (partition by seller_id order by cal_dt) -,lead(price,1,1.234) over (partition by seller_id order by cal_dt) +lag(cal_dt,50,date'2019-01-01') over (partition by seller_id order by item_count,order_id) +,lag(cal_dt,50,'2019-01-01') over (partition by seller_id order by item_count,order_id) +,lead(item_count,1,'2019') over (partition by seller_id order by cal_dt,order_id) +,lead(item_count,1,2019) over (partition by seller_id order by cal_dt,order_id) +,lead(price,1,1.234) over (partition by seller_id order by cal_dt,order_id) from test_kylin_fact \ No newline at end of file diff --git a/src/kylin-it/src/test/resources/query/sql_window/query01.sql b/src/kylin-it/src/test/resources/query/sql_window/query01.sql index a41bad7a67..f309664061 100644 --- a/src/kylin-it/src/test/resources/query/sql_window/query01.sql +++ b/src/kylin-it/src/test/resources/query/sql_window/query01.sql @@ -16,6 +16,8 @@ -- limitations under the License. -- -select lstg_format_name, sum(price) as GMV, count(lstg_format_name) over(partition by lstg_format_name order by cal_dt, lstg_format_name) +select lstg_format_name, sum(price) as GMV, + count(lstg_format_name) over(partition by lstg_format_name order by cal_dt) cnt from test_kylin_fact group by cal_dt, lstg_format_name +order by lstg_format_name, GMV, cnt diff --git a/src/kylin-it/src/test/resources/query/sql_window/query02.sql b/src/kylin-it/src/test/resources/query/sql_window/query02.sql index dd00ce7f71..3ac915e061 100644 --- a/src/kylin-it/src/test/resources/query/sql_window/query02.sql +++ b/src/kylin-it/src/test/resources/query/sql_window/query02.sql @@ -16,6 +16,7 @@ -- limitations under the License. -- -select lstg_format_name,round(avg(sum(price)) over(partition by lstg_format_name order by cal_dt, lstg_format_name),0) +select lstg_format_name,round(avg(sum(price)) over(partition by lstg_format_name order by cal_dt, lstg_format_name),0) rd from test_kylin_fact group by cal_dt, lstg_format_name +order by lstg_format_name,rd diff --git a/src/kylin-it/src/test/resources/query/sql_window/query03.sql b/src/kylin-it/src/test/resources/query/sql_window/query03.sql index b9e38362cc..2337aa3cf8 100644 --- a/src/kylin-it/src/test/resources/query/sql_window/query03.sql +++ b/src/kylin-it/src/test/resources/query/sql_window/query03.sql @@ -17,8 +17,9 @@ -- select lstg_format_name, -sum(sum(price)) over(partition by lstg_format_name order by cal_dt, lstg_format_name), -max(sum(price)) over(partition by lstg_format_name order by cal_dt, lstg_format_name), -min(sum(price)) over(partition by lstg_format_name order by cal_dt, lstg_format_name) +sum(sum(price)) over(partition by lstg_format_name order by cal_dt, lstg_format_name) sp, +max(sum(price)) over(partition by lstg_format_name order by cal_dt, lstg_format_name) mp, +min(sum(price)) over(partition by lstg_format_name order by cal_dt, lstg_format_name) mnp from test_kylin_fact group by cal_dt, lstg_format_name +order by lstg_format_name,sp,mp,mnp diff --git a/src/kylin-it/src/test/resources/query/sql_window/query04.sql b/src/kylin-it/src/test/resources/query/sql_window/query04.sql index 648b126767..1917a779bd 100644 --- a/src/kylin-it/src/test/resources/query/sql_window/query04.sql +++ b/src/kylin-it/src/test/resources/query/sql_window/query04.sql @@ -16,8 +16,9 @@ -- limitations under the License. -- -select cal_dt, lstg_format_name, sum(price), +select cal_dt, lstg_format_name, sum(price) sp, rank() over(partition by lstg_format_name order by cal_dt, lstg_format_name) as "RANK", dense_rank() over(partition by lstg_format_name order by cal_dt, lstg_format_name) as "DENSE_RANK" from test_kylin_fact group by cal_dt, lstg_format_name +order by cal_dt,lstg_format_name,sp,"RANK","DENSE_RANK" diff --git a/src/kylin-it/src/test/resources/query/sql_window/query05.sql b/src/kylin-it/src/test/resources/query/sql_window/query05.sql index 41fa43e564..7df37ba3f8 100644 --- a/src/kylin-it/src/test/resources/query/sql_window/query05.sql +++ b/src/kylin-it/src/test/resources/query/sql_window/query05.sql @@ -25,3 +25,4 @@ ntile(4) over (partition by lstg_format_name order by cal_dt) as "quarter" from test_kylin_fact where cal_dt < '2012-02-01' group by cal_dt, lstg_format_name +order by cal_dt, lstg_format_name,"first","current","prev","next","quarter" \ No newline at end of file diff --git a/src/kylin-it/src/test/resources/query/sql_window/query06.sql b/src/kylin-it/src/test/resources/query/sql_window/query06.sql index 6f9c6877ba..637f88c644 100644 --- a/src/kylin-it/src/test/resources/query/sql_window/query06.sql +++ b/src/kylin-it/src/test/resources/query/sql_window/query06.sql @@ -22,3 +22,4 @@ when 0.0 then 0 else sum(price)/lag(sum(price)) over(partition by lstg_format_na from test_kylin_fact where cal_dt < '2012-02-01' group by cal_dt, lstg_format_name +order by cal_dt, lstg_format_name, GMV, "prev" diff --git a/src/kylin-it/src/test/resources/query/sql_window/query07.sql b/src/kylin-it/src/test/resources/query/sql_window/query07.sql index eef5a37594..a1d538b414 100644 --- a/src/kylin-it/src/test/resources/query/sql_window/query07.sql +++ b/src/kylin-it/src/test/resources/query/sql_window/query07.sql @@ -22,3 +22,4 @@ last_value(sum(price)) over (partition by lstg_format_name order by cal_dt rows from test_kylin_fact where cal_dt < '2012-02-01' group by cal_dt, lstg_format_name +order by cal_dt, lstg_format_name, GMV, "prev 2 rows", "next 2 rows" diff --git a/src/kylin-it/src/test/resources/query/sql_window/query08.sql b/src/kylin-it/src/test/resources/query/sql_window/query08.sql index e239e87b72..b2615b2c41 100644 --- a/src/kylin-it/src/test/resources/query/sql_window/query08.sql +++ b/src/kylin-it/src/test/resources/query/sql_window/query08.sql @@ -22,3 +22,4 @@ last_value(sum(price)) over (partition by lstg_format_name order by cast(cal_dt from test_kylin_fact where cal_dt < '2012-02-01' group by cal_dt, lstg_format_name +order by cal_dt, lstg_format_name, GMV, "prev 3 days", "next 3 days" diff --git a/src/kylin-it/src/test/resources/query/sql_window/query08.sql.expected b/src/kylin-it/src/test/resources/query/sql_window/query08.sql.expected index 62bf7bd9b7..0051ef33fb 100644 --- a/src/kylin-it/src/test/resources/query/sql_window/query08.sql.expected +++ b/src/kylin-it/src/test/resources/query/sql_window/query08.sql.expected @@ -21,3 +21,4 @@ last_value(sum(price)) over (partition by lstg_format_name order by cast(cast(ca from test_kylin_fact where cal_dt < '2012-02-01' group by cal_dt, lstg_format_name +order by cal_dt, lstg_format_name, GMV, "prev 3 days", "next 3 days" diff --git a/src/kylin-it/src/test/resources/query/sql_window/query09.sql b/src/kylin-it/src/test/resources/query/sql_window/query09.sql index c28318d180..a4c304c111 100644 --- a/src/kylin-it/src/test/resources/query/sql_window/query09.sql +++ b/src/kylin-it/src/test/resources/query/sql_window/query09.sql @@ -25,3 +25,4 @@ select * from ( group by cal_dt, lstg_format_name )t where cal_dt between '2013-01-06' and '2013-01-15' +order by cal_dt, lstg_format_name, GMV, "last_day", "last_year" diff --git a/src/kylin-it/src/test/resources/query/sql_window/query09.sql.expected b/src/kylin-it/src/test/resources/query/sql_window/query09.sql.expected index 0fe1d6bba3..c9e4985342 100644 --- a/src/kylin-it/src/test/resources/query/sql_window/query09.sql.expected +++ b/src/kylin-it/src/test/resources/query/sql_window/query09.sql.expected @@ -27,3 +27,4 @@ with tmptb as (select cal_dt, lstg_format_name, sum(price) as GMV, select * from tmptb where cal_dt between '2013-01-06' and '2013-01-15' +order by cal_dt, lstg_format_name, GMV, "last_day", "last_year" diff --git a/src/kylin-it/src/test/resources/query/sql_window/query12.sql b/src/kylin-it/src/test/resources/query/sql_window/query12.sql index b9f23ead11..7e795ee055 100644 --- a/src/kylin-it/src/test/resources/query/sql_window/query12.sql +++ b/src/kylin-it/src/test/resources/query/sql_window/query12.sql @@ -24,3 +24,4 @@ select * from( group by cal_dt, lstg_format_name,SLR_SEGMENT_CD )t where cal_dt between '2013-01-06' and '2013-01-15' +order by cal_dt, lstg_format_name, GMV, "last_day", "last_year" diff --git a/src/kylin-it/src/test/resources/query/sql_window/query12.sql.expected b/src/kylin-it/src/test/resources/query/sql_window/query12.sql.expected index 8fd51963bf..64ff8b47f9 100644 --- a/src/kylin-it/src/test/resources/query/sql_window/query12.sql.expected +++ b/src/kylin-it/src/test/resources/query/sql_window/query12.sql.expected @@ -24,3 +24,4 @@ select * from( group by cal_dt, lstg_format_name,SLR_SEGMENT_CD )t where cal_dt between '2013-01-06' and '2013-01-15' +order by cal_dt, lstg_format_name, GMV, "last_day", "last_year" diff --git a/src/kylin-it/src/test/resources/query/sql_window/query13.sql b/src/kylin-it/src/test/resources/query/sql_window/query13.sql index 9aff463519..621bd3673b 100644 --- a/src/kylin-it/src/test/resources/query/sql_window/query13.sql +++ b/src/kylin-it/src/test/resources/query/sql_window/query13.sql @@ -25,3 +25,4 @@ ntile(4) over (partition by lstg_format_name order by cal_dt) as "quarter" from test_kylin_fact where cal_dt < '2012-02-01' group by cal_dt, lstg_format_name +order by cal_dt, lstg_format_name,GMV,"first","current","prev","next","quarter" diff --git a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/NLocalWithSparkSessionTestBase.java b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/NLocalWithSparkSessionTestBase.java index f90f597f25..4baac3673a 100644 --- a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/NLocalWithSparkSessionTestBase.java +++ b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/NLocalWithSparkSessionTestBase.java @@ -47,6 +47,7 @@ import org.apache.spark.sql.SparderEnv; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.optimizer.ConvertInnerJoinToSemiJoin; import org.apache.spark.sql.common.GlutenTestConfig; +import org.apache.spark.sql.execution.datasource.KylinDeltaSourceStrategy; import org.apache.spark.sql.internal.StaticSQLConf; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.DataTypes; @@ -123,6 +124,7 @@ public class NLocalWithSparkSessionTestBase extends NLocalFileMetadataTestCase i cleanupAnyExistingSession(); ss = SparkSession.builder().withExtensions(ext -> { ext.injectOptimizerRule(ss -> new ConvertInnerJoinToSemiJoin()); + ext.injectPlannerStrategy(ss -> new KylinDeltaSourceStrategy()); return null; }).config(sparkConf).getOrCreate(); SparderEnv.setSparkSession(ss); diff --git a/src/spark-project/kylin-internal-catalog/pom.xml b/src/spark-project/kylin-internal-catalog/pom.xml index bc8387d71a..f67ee74c4e 100644 --- a/src/spark-project/kylin-internal-catalog/pom.xml +++ b/src/spark-project/kylin-internal-catalog/pom.xml @@ -79,6 +79,12 @@ </exclusions> <scope>provided</scope> </dependency> + <dependency> + <groupId>org.apache.gluten</groupId> + <artifactId>gluten-iceberg</artifactId> + <version>${gluten.version}</version> + <scope>provided</scope> + </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-catalyst_2.12</artifactId> diff --git a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala index 01a16902c3..ef18bc8700 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala @@ -308,7 +308,7 @@ object SparderEnv extends Logging { def injectExtensions(sse: SparkSessionExtensions): Unit = { sse.injectPlannerStrategy(_ => KylinSourceStrategy) sse.injectPlannerStrategy(_ => LayoutFileSourceStrategy) - sse.injectPlannerStrategy(_ => KylinDeltaSourceStrategy) + sse.injectPlannerStrategy(_ => new KylinDeltaSourceStrategy) sse.injectPostHocResolutionRule(HiveStorageRule) sse.injectOptimizerRule(_ => new ConvertInnerJoinToSemiJoin()) if (KapConfig.getInstanceFromEnv.isConstraintPropagationEnabled) { diff --git a/src/spark-project/spark-common/pom.xml b/src/spark-project/spark-common/pom.xml index 7e3e0b738c..deb32d34d0 100644 --- a/src/spark-project/spark-common/pom.xml +++ b/src/spark-project/spark-common/pom.xml @@ -193,6 +193,12 @@ <version>${gluten.version}</version> <scope>${gluten.deps.scope}</scope> </dependency> + <dependency> + <groupId>org.apache.gluten</groupId> + <artifactId>gluten-iceberg</artifactId> + <version>${gluten.version}</version> + <scope>${gluten.deps.scope}</scope> + </dependency> <dependency> <groupId>org.apache.gluten</groupId> <artifactId>spark-sql-columnar-shims-kyspark</artifactId> diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/KylinDeltaSourceStrategy.scala b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/KylinDeltaSourceStrategy.scala index 67892f2912..02b6d21d13 100644 --- a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/KylinDeltaSourceStrategy.scala +++ b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/KylinDeltaSourceStrategy.scala @@ -56,7 +56,7 @@ import org.apache.spark.util.collection.BitSet * is under the threshold with the addition of the next file, add it. If not, open a new bucket * and add it. Proceed to the next file. */ -object KylinDeltaSourceStrategy extends Strategy with PredicateHelper with Logging { +class KylinDeltaSourceStrategy extends Strategy with PredicateHelper with Logging { // should prune buckets iff num buckets is greater than 1 and there is only one bucket column private def shouldPruneBuckets(bucketSpec: Option[BucketSpec]): Boolean = { diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/gluten/ConvertKylinFileSourceToGlutenRule.scala b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/gluten/ConvertKylinFileSourceToGlutenRule.scala index e1b59b8831..d122f945dc 100644 --- a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/gluten/ConvertKylinFileSourceToGlutenRule.scala +++ b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/gluten/ConvertKylinFileSourceToGlutenRule.scala @@ -22,7 +22,7 @@ import org.apache.gluten.execution.{FileSourceScanExecTransformer, GlutenPlan, V import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.utils.PushDownUtil -import org.apache.spark.sql.execution.{KylinFileSourceScanExec, LayoutFileSourceScanExec, SparkPlan} +import org.apache.spark.sql.execution.{KylinFileSourceScanExec, KylinStorageScanExec, LayoutFileSourceScanExec, SparkPlan} class ConvertKylinFileSourceToGlutenRule(val session: SparkSession) extends Rule[SparkPlan] { @@ -72,5 +72,19 @@ class ConvertKylinFileSourceToGlutenRule(val session: SparkSession) extends Rule ) // Transformer validate tryReturnGlutenPlan(transformer, l) + case v3Scan: KylinStorageScanExec => + // convert to Gluten transformer + val transformer = new KylinStorageScanExecTransformer( + v3Scan.relation, + v3Scan.output, + v3Scan.requiredSchema, + v3Scan.partitionFilters, + v3Scan.optionalBucketSet, + v3Scan.optionalNumCoalescedBuckets, + PushDownUtil.removeNotSupportPushDownFilters(v3Scan.conf, v3Scan.output, v3Scan.dataFilters), + v3Scan.tableIdentifier, + v3Scan.disableBucketedScan) + + tryReturnGlutenPlan(transformer, v3Scan) } } diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/gluten/KylinStorageScanExecTransformer.scala b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/gluten/KylinStorageScanExecTransformer.scala new file mode 100644 index 0000000000..1677309fd9 --- /dev/null +++ b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/gluten/KylinStorageScanExecTransformer.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.gluten + +import org.apache.gluten.execution.FileSourceScanExecTransformer +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} +import org.apache.spark.sql.execution.datasources.HadoopFsRelation +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.collection.BitSet + +class KylinStorageScanExecTransformer(@transient relation: HadoopFsRelation, + output: Seq[Attribute], + requiredSchema: StructType, + partitionFilters: Seq[Expression], + optionalBucketSet: Option[BitSet], + optionalNumCoalescedBuckets: Option[Int], + dataFilters: Seq[Expression], + tableIdentifier: Option[TableIdentifier], + disableBucketedScan: Boolean = false) extends FileSourceScanExecTransformer( + relation, + output, + requiredSchema, + partitionFilters, + optionalBucketSet, + optionalNumCoalescedBuckets, + dataFilters, + tableIdentifier, + disableBucketedScan) { + +} diff --git a/src/spark-project/spark-common/src/test/java/org/apache/spark/sql/common/GlutenTestConfig.scala b/src/spark-project/spark-common/src/test/java/org/apache/spark/sql/common/GlutenTestConfig.scala index 47bc43014b..36d7378760 100644 --- a/src/spark-project/spark-common/src/test/java/org/apache/spark/sql/common/GlutenTestConfig.scala +++ b/src/spark-project/spark-common/src/test/java/org/apache/spark/sql/common/GlutenTestConfig.scala @@ -27,12 +27,22 @@ object GlutenTestConfig extends Logging { private val GLUTEN_CH_LIB_PATH_KEY = "clickhouse.lib.path" - def configGluten(conf: SparkConf): Unit = { + def enableGluten(): Boolean = { val chLibPath = System.getProperty(GLUTEN_CH_LIB_PATH_KEY) if (StringUtils.isEmpty(chLibPath) || !new File(chLibPath).exists) { log.warn("-Dclickhouse.lib.path is not set or path not exists, skip gluten config") - return // skip + false // skip + } else { + true } + } + + def configGluten(conf: SparkConf): Unit = { + if (!enableGluten()) { + return + } + + val chLibPath = System.getProperty(GLUTEN_CH_LIB_PATH_KEY) conf.set("spark.gluten.enabled", "true") conf.set("spark.plugins", "org.apache.gluten.GlutenPlugin") conf.set("spark.gluten.sql.columnar.libpath", chLibPath) diff --git a/src/spark-project/spark-common/src/test/scala/org/apache/spark/sql/execution/KylinFileSourceScanExecSuite.scala b/src/spark-project/spark-common/src/test/scala/org/apache/spark/sql/execution/KylinFileSourceScanExecSuite.scala index 970ff99076..af601c371b 100644 --- a/src/spark-project/spark-common/src/test/scala/org/apache/spark/sql/execution/KylinFileSourceScanExecSuite.scala +++ b/src/spark-project/spark-common/src/test/scala/org/apache/spark/sql/execution/KylinFileSourceScanExecSuite.scala @@ -17,8 +17,11 @@ package org.apache.spark.sql.execution +import java.io.File import java.util.concurrent.TimeUnit +import org.apache.commons.lang3.StringUtils +import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform import org.apache.hadoop.fs.Path import org.apache.kylin.cache.softaffinity.SoftAffinityConstants import org.apache.spark.SparkFunSuite @@ -31,8 +34,9 @@ import org.apache.spark.sql.common.LocalMetadata import org.apache.spark.sql.delta.KylinDeltaLogFileIndex import org.apache.spark.sql.delta.actions.AddFile import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper -import org.apache.spark.sql.execution.datasource.{FilePruner, KylinDeltaSourceStrategy, KylinSourceStrategy, LayoutFileSourceStrategy} -import org.apache.spark.sql.execution.datasources.{CacheFileScanRDD, FileIndex, FileScanRDD, HadoopFsRelation, LogicalRelation, PartitionDirectory} +import org.apache.spark.sql.execution.datasource._ +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.gluten.KylinStorageScanExecTransformer import org.mockito.{ArgumentMatchers, Mockito} import com.google.common.cache.CacheBuilder @@ -56,7 +60,7 @@ class KylinFileSourceScanExecSuite extends SparkFunSuite .withExtensions { ext => ext.injectPlannerStrategy(_ => KylinSourceStrategy) ext.injectPlannerStrategy(_ => LayoutFileSourceStrategy) - ext.injectPlannerStrategy(_ => KylinDeltaSourceStrategy) + ext.injectPlannerStrategy(_ => new KylinDeltaSourceStrategy) } .getOrCreate() @@ -80,6 +84,47 @@ class KylinFileSourceScanExecSuite extends SparkFunSuite spark.sparkContext.stop() } + test("[Gluten] Create sharding read RDD with Soft affinity - CacheFileScanRDD") { + val chLibPath = System.getProperty("clickhouse.lib.path") + if (StringUtils.isEmpty(chLibPath) || !new File(chLibPath).exists) { + log.warn("-Dclickhouse.lib.path is not set or path not exists, skip gluten config") + } else { + SparkSession.cleanupAnyExistingSession() + val spark = SparkSession.builder() + .master("local[1]") + .config(SoftAffinityConstants.PARAMS_KEY_SOFT_AFFINITY_ENABLED, "true") + .config("spark.sql.shuffle.partitions", "1") + .config("spark.sql.adaptive.enabled", "false") + .config("spark.plugins", "org.apache.gluten.GlutenPlugin") + .config("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") + .config("spark.io.compression.codec", "LZ4") + .config("spark.gluten.sql.columnar.libpath", chLibPath) + .config("spark.gluten.sql.enable.native.validation", "false") + .config("spark.memory.offHeap.enabled", "true") + .config("spark.memory.offHeap.size", "2G") + .config("spark.gluten.sql.columnar.extended.columnar.pre.rules", + "org.apache.spark.sql.execution.gluten.ConvertKylinFileSourceToGlutenRule") + .withExtensions { ext => + ext.injectPlannerStrategy(_ => KylinSourceStrategy) + ext.injectPlannerStrategy(_ => LayoutFileSourceStrategy) + ext.injectPlannerStrategy(_ => new KylinDeltaSourceStrategy) + } + .getOrCreate() + + withTempPath { path => + val tempDir = path.getCanonicalPath + val df = createSimpleFileDeltaDF(spark, tempDir) + val transformed = HeuristicTransform.static()(df.queryExecution.executedPlan) + val res = transformed.collect { + case p: KylinStorageScanExecTransformer => p + } + assertResult(1)(res.size) + } + + spark.sparkContext.stop() + } + } + test("Create sharding read RDD without Soft affinity - FileScanRDD") { withTempPath { path => SparkSession.cleanupAnyExistingSession()