This is an automated email from the ASF dual-hosted git repository.

caolu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin5 by this push:
     new d0433ae8cb KYLIN-6063 Fix KylinStorageScanExec fallback
d0433ae8cb is described below

commit d0433ae8cbeb1860592c38fc9cbaa42303342558
Author: Shuai li <loney...@live.cn>
AuthorDate: Tue Jan 7 10:13:07 2025 +0800

    KYLIN-6063 Fix KylinStorageScanExec fallback
---
 .../apache/kylin/gluten/GlutenKEConfigTest.java    | 11 +++++
 .../apache/kylin/util/QueryResultComparator.java   |  2 +-
 .../query/sql_window/new_sql_window/query20.sql    | 10 ++---
 .../test/resources/query/sql_window/query01.sql    |  4 +-
 .../test/resources/query/sql_window/query02.sql    |  3 +-
 .../test/resources/query/sql_window/query03.sql    |  7 +--
 .../test/resources/query/sql_window/query04.sql    |  3 +-
 .../test/resources/query/sql_window/query05.sql    |  1 +
 .../test/resources/query/sql_window/query06.sql    |  1 +
 .../test/resources/query/sql_window/query07.sql    |  1 +
 .../test/resources/query/sql_window/query08.sql    |  1 +
 .../query/sql_window/query08.sql.expected          |  1 +
 .../test/resources/query/sql_window/query09.sql    |  1 +
 .../query/sql_window/query09.sql.expected          |  1 +
 .../test/resources/query/sql_window/query12.sql    |  1 +
 .../query/sql_window/query12.sql.expected          |  1 +
 .../test/resources/query/sql_window/query13.sql    |  1 +
 .../spark/NLocalWithSparkSessionTestBase.java      |  2 +
 src/spark-project/kylin-internal-catalog/pom.xml   |  6 +++
 .../scala/org/apache/spark/sql/SparderEnv.scala    |  2 +-
 src/spark-project/spark-common/pom.xml             |  6 +++
 .../datasource/KylinDeltaSourceStrategy.scala      |  2 +-
 .../ConvertKylinFileSourceToGlutenRule.scala       | 16 ++++++-
 .../gluten/KylinStorageScanExecTransformer.scala   | 47 ++++++++++++++++++++
 .../apache/spark/sql/common/GlutenTestConfig.scala | 14 +++++-
 .../execution/KylinFileSourceScanExecSuite.scala   | 51 ++++++++++++++++++++--
 26 files changed, 176 insertions(+), 20 deletions(-)

diff --git 
a/src/kylin-it/src/test/java/org/apache/kylin/gluten/GlutenKEConfigTest.java 
b/src/kylin-it/src/test/java/org/apache/kylin/gluten/GlutenKEConfigTest.java
index bb414d2323..400b99c418 100644
--- a/src/kylin-it/src/test/java/org/apache/kylin/gluten/GlutenKEConfigTest.java
+++ b/src/kylin-it/src/test/java/org/apache/kylin/gluten/GlutenKEConfigTest.java
@@ -18,10 +18,12 @@
 
 package org.apache.kylin.gluten;
 
+import org.apache.gluten.execution.FilterExecTransformer;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.query.engine.QueryExec;
 import org.apache.kylin.rec.util.AccelerationUtil;
 import org.apache.kylin.util.SuggestTestBase;
+import org.apache.spark.sql.common.GlutenTestConfig;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -41,6 +43,15 @@ public class GlutenKEConfigTest extends SuggestTestBase {
         Assert.assertTrue(resultSet.getSize() > 0);
     }
 
+    @Test
+    public void testGlutenIsEnable() throws Exception {
+        if (!GlutenTestConfig.enableGluten()) {
+            return;
+        }
+        val a = ss.range(100).filter("id < 5").queryExecution().executedPlan();
+        
Assert.assertTrue(a.find(FilterExecTransformer.class::isInstance).nonEmpty());
+    }
+
     private void proposeAndBuildIndex(String[] sqls) throws 
InterruptedException {
         AccelerationUtil.runWithSmartContext(KylinConfig.getInstanceFromEnv(), 
PROJECT, sqls, true);
         buildAllModels(KylinConfig.getInstanceFromEnv(), PROJECT);
diff --git 
a/src/kylin-it/src/test/java/org/apache/kylin/util/QueryResultComparator.java 
b/src/kylin-it/src/test/java/org/apache/kylin/util/QueryResultComparator.java
index 4fe6a5840b..0c4c6f4d74 100644
--- 
a/src/kylin-it/src/test/java/org/apache/kylin/util/QueryResultComparator.java
+++ 
b/src/kylin-it/src/test/java/org/apache/kylin/util/QueryResultComparator.java
@@ -129,7 +129,7 @@ public class QueryResultComparator {
 
     private static void printRows(String source, List<String> rows) {
         log.info("***********" + source + " start, only show top 100 
result**********");
-        rows.stream().limit(100).forEach(log::info);
+        rows.stream().limit(11000).forEach(log::info);
         log.info("***********" + source + " end**********");
     }
 }
diff --git 
a/src/kylin-it/src/test/resources/query/sql_window/new_sql_window/query20.sql 
b/src/kylin-it/src/test/resources/query/sql_window/new_sql_window/query20.sql
index bec5dcabaf..91566ae288 100644
--- 
a/src/kylin-it/src/test/resources/query/sql_window/new_sql_window/query20.sql
+++ 
b/src/kylin-it/src/test/resources/query/sql_window/new_sql_window/query20.sql
@@ -22,9 +22,9 @@
 -- OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 --
 select
-lag(cal_dt,50,date'2019-01-01') over (partition by seller_id order by 
item_count)
-,lag(cal_dt,50,'2019-01-01') over (partition by seller_id order by item_count)
-,lead(item_count,1,'2019') over (partition by seller_id order by cal_dt)
-,lead(item_count,1,2019) over (partition by seller_id order by cal_dt)
-,lead(price,1,1.234) over (partition by seller_id order by cal_dt)
+lag(cal_dt,50,date'2019-01-01') over (partition by seller_id order by 
item_count,order_id)
+,lag(cal_dt,50,'2019-01-01') over (partition by seller_id order by 
item_count,order_id)
+,lead(item_count,1,'2019') over (partition by seller_id order by 
cal_dt,order_id)
+,lead(item_count,1,2019) over (partition by seller_id order by cal_dt,order_id)
+,lead(price,1,1.234) over (partition by seller_id order by cal_dt,order_id)
 from test_kylin_fact
\ No newline at end of file
diff --git a/src/kylin-it/src/test/resources/query/sql_window/query01.sql 
b/src/kylin-it/src/test/resources/query/sql_window/query01.sql
index a41bad7a67..f309664061 100644
--- a/src/kylin-it/src/test/resources/query/sql_window/query01.sql
+++ b/src/kylin-it/src/test/resources/query/sql_window/query01.sql
@@ -16,6 +16,8 @@
 -- limitations under the License.
 --
 
-select lstg_format_name, sum(price) as GMV, count(lstg_format_name) 
over(partition by lstg_format_name order by cal_dt, lstg_format_name)
+select lstg_format_name, sum(price) as GMV,
+       count(lstg_format_name) over(partition by lstg_format_name order by 
cal_dt) cnt
 from test_kylin_fact
 group by cal_dt, lstg_format_name
+order by lstg_format_name, GMV, cnt
diff --git a/src/kylin-it/src/test/resources/query/sql_window/query02.sql 
b/src/kylin-it/src/test/resources/query/sql_window/query02.sql
index dd00ce7f71..3ac915e061 100644
--- a/src/kylin-it/src/test/resources/query/sql_window/query02.sql
+++ b/src/kylin-it/src/test/resources/query/sql_window/query02.sql
@@ -16,6 +16,7 @@
 -- limitations under the License.
 --
 
-select lstg_format_name,round(avg(sum(price)) over(partition by 
lstg_format_name order by cal_dt, lstg_format_name),0)
+select lstg_format_name,round(avg(sum(price)) over(partition by 
lstg_format_name order by cal_dt, lstg_format_name),0) rd
 from test_kylin_fact
 group by cal_dt, lstg_format_name
+order by lstg_format_name,rd
diff --git a/src/kylin-it/src/test/resources/query/sql_window/query03.sql 
b/src/kylin-it/src/test/resources/query/sql_window/query03.sql
index b9e38362cc..2337aa3cf8 100644
--- a/src/kylin-it/src/test/resources/query/sql_window/query03.sql
+++ b/src/kylin-it/src/test/resources/query/sql_window/query03.sql
@@ -17,8 +17,9 @@
 --
 
 select lstg_format_name,
-sum(sum(price)) over(partition by lstg_format_name order by cal_dt, 
lstg_format_name),
-max(sum(price)) over(partition by lstg_format_name order by cal_dt, 
lstg_format_name),
-min(sum(price)) over(partition by lstg_format_name order by cal_dt, 
lstg_format_name)
+sum(sum(price)) over(partition by lstg_format_name order by cal_dt, 
lstg_format_name) sp,
+max(sum(price)) over(partition by lstg_format_name order by cal_dt, 
lstg_format_name) mp,
+min(sum(price)) over(partition by lstg_format_name order by cal_dt, 
lstg_format_name) mnp
 from test_kylin_fact
 group by cal_dt, lstg_format_name
+order by lstg_format_name,sp,mp,mnp
diff --git a/src/kylin-it/src/test/resources/query/sql_window/query04.sql 
b/src/kylin-it/src/test/resources/query/sql_window/query04.sql
index 648b126767..1917a779bd 100644
--- a/src/kylin-it/src/test/resources/query/sql_window/query04.sql
+++ b/src/kylin-it/src/test/resources/query/sql_window/query04.sql
@@ -16,8 +16,9 @@
 -- limitations under the License.
 --
 
-select cal_dt, lstg_format_name, sum(price),
+select cal_dt, lstg_format_name, sum(price) sp,
 rank() over(partition by lstg_format_name order by cal_dt, lstg_format_name) 
as "RANK",
 dense_rank() over(partition by lstg_format_name order by cal_dt, 
lstg_format_name) as "DENSE_RANK"
 from test_kylin_fact
 group by cal_dt, lstg_format_name
+order by cal_dt,lstg_format_name,sp,"RANK","DENSE_RANK"
diff --git a/src/kylin-it/src/test/resources/query/sql_window/query05.sql 
b/src/kylin-it/src/test/resources/query/sql_window/query05.sql
index 41fa43e564..7df37ba3f8 100644
--- a/src/kylin-it/src/test/resources/query/sql_window/query05.sql
+++ b/src/kylin-it/src/test/resources/query/sql_window/query05.sql
@@ -25,3 +25,4 @@ ntile(4) over (partition by lstg_format_name order by cal_dt) 
as "quarter"
 from test_kylin_fact
 where cal_dt < '2012-02-01'
 group by cal_dt, lstg_format_name
+order by cal_dt, lstg_format_name,"first","current","prev","next","quarter"
\ No newline at end of file
diff --git a/src/kylin-it/src/test/resources/query/sql_window/query06.sql 
b/src/kylin-it/src/test/resources/query/sql_window/query06.sql
index 6f9c6877ba..637f88c644 100644
--- a/src/kylin-it/src/test/resources/query/sql_window/query06.sql
+++ b/src/kylin-it/src/test/resources/query/sql_window/query06.sql
@@ -22,3 +22,4 @@ when 0.0 then 0 else sum(price)/lag(sum(price)) 
over(partition by lstg_format_na
 from test_kylin_fact
 where cal_dt < '2012-02-01'
 group by cal_dt, lstg_format_name
+order by cal_dt, lstg_format_name, GMV, "prev"
diff --git a/src/kylin-it/src/test/resources/query/sql_window/query07.sql 
b/src/kylin-it/src/test/resources/query/sql_window/query07.sql
index eef5a37594..a1d538b414 100644
--- a/src/kylin-it/src/test/resources/query/sql_window/query07.sql
+++ b/src/kylin-it/src/test/resources/query/sql_window/query07.sql
@@ -22,3 +22,4 @@ last_value(sum(price)) over (partition by lstg_format_name 
order by cal_dt rows
 from test_kylin_fact
 where cal_dt < '2012-02-01'
 group by cal_dt, lstg_format_name
+order by cal_dt, lstg_format_name, GMV, "prev 2 rows", "next 2 rows"
diff --git a/src/kylin-it/src/test/resources/query/sql_window/query08.sql 
b/src/kylin-it/src/test/resources/query/sql_window/query08.sql
index e239e87b72..b2615b2c41 100644
--- a/src/kylin-it/src/test/resources/query/sql_window/query08.sql
+++ b/src/kylin-it/src/test/resources/query/sql_window/query08.sql
@@ -22,3 +22,4 @@ last_value(sum(price)) over (partition by lstg_format_name 
order by cast(cal_dt
 from test_kylin_fact
 where cal_dt < '2012-02-01'
 group by cal_dt, lstg_format_name
+order by cal_dt, lstg_format_name, GMV, "prev 3 days", "next 3 days"
diff --git 
a/src/kylin-it/src/test/resources/query/sql_window/query08.sql.expected 
b/src/kylin-it/src/test/resources/query/sql_window/query08.sql.expected
index 62bf7bd9b7..0051ef33fb 100644
--- a/src/kylin-it/src/test/resources/query/sql_window/query08.sql.expected
+++ b/src/kylin-it/src/test/resources/query/sql_window/query08.sql.expected
@@ -21,3 +21,4 @@ last_value(sum(price)) over (partition by lstg_format_name 
order by cast(cast(ca
 from test_kylin_fact
 where cal_dt < '2012-02-01'
 group by cal_dt, lstg_format_name
+order by cal_dt, lstg_format_name, GMV, "prev 3 days", "next 3 days"
diff --git a/src/kylin-it/src/test/resources/query/sql_window/query09.sql 
b/src/kylin-it/src/test/resources/query/sql_window/query09.sql
index c28318d180..a4c304c111 100644
--- a/src/kylin-it/src/test/resources/query/sql_window/query09.sql
+++ b/src/kylin-it/src/test/resources/query/sql_window/query09.sql
@@ -25,3 +25,4 @@ select * from (
   group by cal_dt, lstg_format_name
 )t
 where cal_dt between '2013-01-06' and '2013-01-15'
+order by cal_dt, lstg_format_name, GMV, "last_day", "last_year"
diff --git 
a/src/kylin-it/src/test/resources/query/sql_window/query09.sql.expected 
b/src/kylin-it/src/test/resources/query/sql_window/query09.sql.expected
index 0fe1d6bba3..c9e4985342 100644
--- a/src/kylin-it/src/test/resources/query/sql_window/query09.sql.expected
+++ b/src/kylin-it/src/test/resources/query/sql_window/query09.sql.expected
@@ -27,3 +27,4 @@ with tmptb as (select cal_dt, lstg_format_name, sum(price) as 
GMV,
 select * from
 tmptb
 where cal_dt between '2013-01-06' and '2013-01-15'
+order by cal_dt, lstg_format_name, GMV, "last_day", "last_year"
diff --git a/src/kylin-it/src/test/resources/query/sql_window/query12.sql 
b/src/kylin-it/src/test/resources/query/sql_window/query12.sql
index b9f23ead11..7e795ee055 100644
--- a/src/kylin-it/src/test/resources/query/sql_window/query12.sql
+++ b/src/kylin-it/src/test/resources/query/sql_window/query12.sql
@@ -24,3 +24,4 @@ select * from(
   group by cal_dt, lstg_format_name,SLR_SEGMENT_CD
 )t
 where cal_dt between '2013-01-06' and '2013-01-15'
+order by cal_dt, lstg_format_name, GMV, "last_day", "last_year"
diff --git 
a/src/kylin-it/src/test/resources/query/sql_window/query12.sql.expected 
b/src/kylin-it/src/test/resources/query/sql_window/query12.sql.expected
index 8fd51963bf..64ff8b47f9 100644
--- a/src/kylin-it/src/test/resources/query/sql_window/query12.sql.expected
+++ b/src/kylin-it/src/test/resources/query/sql_window/query12.sql.expected
@@ -24,3 +24,4 @@ select * from(
   group by cal_dt, lstg_format_name,SLR_SEGMENT_CD
 )t
 where cal_dt between '2013-01-06' and '2013-01-15'
+order by cal_dt, lstg_format_name, GMV, "last_day", "last_year"
diff --git a/src/kylin-it/src/test/resources/query/sql_window/query13.sql 
b/src/kylin-it/src/test/resources/query/sql_window/query13.sql
index 9aff463519..621bd3673b 100644
--- a/src/kylin-it/src/test/resources/query/sql_window/query13.sql
+++ b/src/kylin-it/src/test/resources/query/sql_window/query13.sql
@@ -25,3 +25,4 @@ ntile(4) over (partition by lstg_format_name order by cal_dt) 
as "quarter"
 from test_kylin_fact
 where cal_dt < '2012-02-01'
 group by cal_dt, lstg_format_name
+order by cal_dt, lstg_format_name,GMV,"first","current","prev","next","quarter"
diff --git 
a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/NLocalWithSparkSessionTestBase.java
 
b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/NLocalWithSparkSessionTestBase.java
index f90f597f25..4baac3673a 100644
--- 
a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/NLocalWithSparkSessionTestBase.java
+++ 
b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/NLocalWithSparkSessionTestBase.java
@@ -47,6 +47,7 @@ import org.apache.spark.sql.SparderEnv;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.catalyst.optimizer.ConvertInnerJoinToSemiJoin;
 import org.apache.spark.sql.common.GlutenTestConfig;
+import org.apache.spark.sql.execution.datasource.KylinDeltaSourceStrategy;
 import org.apache.spark.sql.internal.StaticSQLConf;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.DataTypes;
@@ -123,6 +124,7 @@ public class NLocalWithSparkSessionTestBase extends 
NLocalFileMetadataTestCase i
         cleanupAnyExistingSession();
         ss = SparkSession.builder().withExtensions(ext -> {
             ext.injectOptimizerRule(ss -> new ConvertInnerJoinToSemiJoin());
+            ext.injectPlannerStrategy(ss -> new KylinDeltaSourceStrategy());
             return null;
         }).config(sparkConf).getOrCreate();
         SparderEnv.setSparkSession(ss);
diff --git a/src/spark-project/kylin-internal-catalog/pom.xml 
b/src/spark-project/kylin-internal-catalog/pom.xml
index bc8387d71a..f67ee74c4e 100644
--- a/src/spark-project/kylin-internal-catalog/pom.xml
+++ b/src/spark-project/kylin-internal-catalog/pom.xml
@@ -79,6 +79,12 @@
             </exclusions>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.gluten</groupId>
+            <artifactId>gluten-iceberg</artifactId>
+            <version>${gluten.version}</version>
+            <scope>provided</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-catalyst_2.12</artifactId>
diff --git 
a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala
 
b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala
index 01a16902c3..ef18bc8700 100644
--- 
a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala
+++ 
b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala
@@ -308,7 +308,7 @@ object SparderEnv extends Logging {
   def injectExtensions(sse: SparkSessionExtensions): Unit = {
     sse.injectPlannerStrategy(_ => KylinSourceStrategy)
     sse.injectPlannerStrategy(_ => LayoutFileSourceStrategy)
-    sse.injectPlannerStrategy(_ => KylinDeltaSourceStrategy)
+    sse.injectPlannerStrategy(_ => new KylinDeltaSourceStrategy)
     sse.injectPostHocResolutionRule(HiveStorageRule)
     sse.injectOptimizerRule(_ => new ConvertInnerJoinToSemiJoin())
     if (KapConfig.getInstanceFromEnv.isConstraintPropagationEnabled) {
diff --git a/src/spark-project/spark-common/pom.xml 
b/src/spark-project/spark-common/pom.xml
index 7e3e0b738c..deb32d34d0 100644
--- a/src/spark-project/spark-common/pom.xml
+++ b/src/spark-project/spark-common/pom.xml
@@ -193,6 +193,12 @@
             <version>${gluten.version}</version>
             <scope>${gluten.deps.scope}</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.gluten</groupId>
+            <artifactId>gluten-iceberg</artifactId>
+            <version>${gluten.version}</version>
+            <scope>${gluten.deps.scope}</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.gluten</groupId>
             <artifactId>spark-sql-columnar-shims-kyspark</artifactId>
diff --git 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/KylinDeltaSourceStrategy.scala
 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/KylinDeltaSourceStrategy.scala
index 67892f2912..02b6d21d13 100644
--- 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/KylinDeltaSourceStrategy.scala
+++ 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/KylinDeltaSourceStrategy.scala
@@ -56,7 +56,7 @@ import org.apache.spark.util.collection.BitSet
  *     is under the threshold with the addition of the next file, add it.  If 
not, open a new bucket
  *     and add it.  Proceed to the next file.
  */
-object KylinDeltaSourceStrategy extends Strategy with PredicateHelper with 
Logging {
+class KylinDeltaSourceStrategy extends Strategy with PredicateHelper with 
Logging {
 
   // should prune buckets iff num buckets is greater than 1 and there is only 
one bucket column
   private def shouldPruneBuckets(bucketSpec: Option[BucketSpec]): Boolean = {
diff --git 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/gluten/ConvertKylinFileSourceToGlutenRule.scala
 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/gluten/ConvertKylinFileSourceToGlutenRule.scala
index e1b59b8831..d122f945dc 100644
--- 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/gluten/ConvertKylinFileSourceToGlutenRule.scala
+++ 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/gluten/ConvertKylinFileSourceToGlutenRule.scala
@@ -22,7 +22,7 @@ import 
org.apache.gluten.execution.{FileSourceScanExecTransformer, GlutenPlan, V
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.utils.PushDownUtil
-import org.apache.spark.sql.execution.{KylinFileSourceScanExec, 
LayoutFileSourceScanExec, SparkPlan}
+import org.apache.spark.sql.execution.{KylinFileSourceScanExec, 
KylinStorageScanExec, LayoutFileSourceScanExec, SparkPlan}
 
 class ConvertKylinFileSourceToGlutenRule(val session: SparkSession) extends 
Rule[SparkPlan] {
 
@@ -72,5 +72,19 @@ class ConvertKylinFileSourceToGlutenRule(val session: 
SparkSession) extends Rule
       )
       // Transformer validate
       tryReturnGlutenPlan(transformer, l)
+    case v3Scan: KylinStorageScanExec =>
+      // convert to Gluten transformer
+      val transformer = new KylinStorageScanExecTransformer(
+        v3Scan.relation,
+        v3Scan.output,
+        v3Scan.requiredSchema,
+        v3Scan.partitionFilters,
+        v3Scan.optionalBucketSet,
+        v3Scan.optionalNumCoalescedBuckets,
+        PushDownUtil.removeNotSupportPushDownFilters(v3Scan.conf, 
v3Scan.output, v3Scan.dataFilters),
+        v3Scan.tableIdentifier,
+        v3Scan.disableBucketedScan)
+
+      tryReturnGlutenPlan(transformer, v3Scan)
   }
 }
diff --git 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/gluten/KylinStorageScanExecTransformer.scala
 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/gluten/KylinStorageScanExecTransformer.scala
new file mode 100644
index 0000000000..1677309fd9
--- /dev/null
+++ 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/gluten/KylinStorageScanExecTransformer.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.gluten
+
+import org.apache.gluten.execution.FileSourceScanExecTransformer
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.execution.datasources.HadoopFsRelation
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.collection.BitSet
+
+class KylinStorageScanExecTransformer(@transient relation: HadoopFsRelation,
+                                      output: Seq[Attribute],
+                                      requiredSchema: StructType,
+                                      partitionFilters: Seq[Expression],
+                                      optionalBucketSet: Option[BitSet],
+                                      optionalNumCoalescedBuckets: Option[Int],
+                                      dataFilters: Seq[Expression],
+                                      tableIdentifier: Option[TableIdentifier],
+                                      disableBucketedScan: Boolean = false) 
extends FileSourceScanExecTransformer(
+  relation,
+  output,
+  requiredSchema,
+  partitionFilters,
+  optionalBucketSet,
+  optionalNumCoalescedBuckets,
+  dataFilters,
+  tableIdentifier,
+  disableBucketedScan) {
+
+}
diff --git 
a/src/spark-project/spark-common/src/test/java/org/apache/spark/sql/common/GlutenTestConfig.scala
 
b/src/spark-project/spark-common/src/test/java/org/apache/spark/sql/common/GlutenTestConfig.scala
index 47bc43014b..36d7378760 100644
--- 
a/src/spark-project/spark-common/src/test/java/org/apache/spark/sql/common/GlutenTestConfig.scala
+++ 
b/src/spark-project/spark-common/src/test/java/org/apache/spark/sql/common/GlutenTestConfig.scala
@@ -27,12 +27,22 @@ object GlutenTestConfig extends Logging {
 
   private val GLUTEN_CH_LIB_PATH_KEY = "clickhouse.lib.path"
 
-  def configGluten(conf: SparkConf): Unit = {
+  def enableGluten(): Boolean = {
     val chLibPath = System.getProperty(GLUTEN_CH_LIB_PATH_KEY)
     if (StringUtils.isEmpty(chLibPath) || !new File(chLibPath).exists) {
       log.warn("-Dclickhouse.lib.path is not set or path not exists, skip 
gluten config")
-      return // skip
+      false // skip
+    } else {
+      true
     }
+  }
+
+  def configGluten(conf: SparkConf): Unit = {
+    if (!enableGluten()) {
+      return
+    }
+
+    val chLibPath = System.getProperty(GLUTEN_CH_LIB_PATH_KEY)
     conf.set("spark.gluten.enabled", "true")
     conf.set("spark.plugins", "org.apache.gluten.GlutenPlugin")
     conf.set("spark.gluten.sql.columnar.libpath", chLibPath)
diff --git 
a/src/spark-project/spark-common/src/test/scala/org/apache/spark/sql/execution/KylinFileSourceScanExecSuite.scala
 
b/src/spark-project/spark-common/src/test/scala/org/apache/spark/sql/execution/KylinFileSourceScanExecSuite.scala
index 970ff99076..af601c371b 100644
--- 
a/src/spark-project/spark-common/src/test/scala/org/apache/spark/sql/execution/KylinFileSourceScanExecSuite.scala
+++ 
b/src/spark-project/spark-common/src/test/scala/org/apache/spark/sql/execution/KylinFileSourceScanExecSuite.scala
@@ -17,8 +17,11 @@
 
 package org.apache.spark.sql.execution
 
+import java.io.File
 import java.util.concurrent.TimeUnit
 
+import org.apache.commons.lang3.StringUtils
+import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform
 import org.apache.hadoop.fs.Path
 import org.apache.kylin.cache.softaffinity.SoftAffinityConstants
 import org.apache.spark.SparkFunSuite
@@ -31,8 +34,9 @@ import org.apache.spark.sql.common.LocalMetadata
 import org.apache.spark.sql.delta.KylinDeltaLogFileIndex
 import org.apache.spark.sql.delta.actions.AddFile
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
-import org.apache.spark.sql.execution.datasource.{FilePruner, 
KylinDeltaSourceStrategy, KylinSourceStrategy, LayoutFileSourceStrategy}
-import org.apache.spark.sql.execution.datasources.{CacheFileScanRDD, 
FileIndex, FileScanRDD, HadoopFsRelation, LogicalRelation, PartitionDirectory}
+import org.apache.spark.sql.execution.datasource._
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.gluten.KylinStorageScanExecTransformer
 import org.mockito.{ArgumentMatchers, Mockito}
 
 import com.google.common.cache.CacheBuilder
@@ -56,7 +60,7 @@ class KylinFileSourceScanExecSuite extends SparkFunSuite
       .withExtensions { ext =>
         ext.injectPlannerStrategy(_ => KylinSourceStrategy)
         ext.injectPlannerStrategy(_ => LayoutFileSourceStrategy)
-        ext.injectPlannerStrategy(_ => KylinDeltaSourceStrategy)
+        ext.injectPlannerStrategy(_ => new KylinDeltaSourceStrategy)
       }
       .getOrCreate()
 
@@ -80,6 +84,47 @@ class KylinFileSourceScanExecSuite extends SparkFunSuite
     spark.sparkContext.stop()
   }
 
+  test("[Gluten] Create sharding read RDD with Soft affinity - 
CacheFileScanRDD") {
+    val chLibPath = System.getProperty("clickhouse.lib.path")
+    if (StringUtils.isEmpty(chLibPath) || !new File(chLibPath).exists) {
+      log.warn("-Dclickhouse.lib.path is not set or path not exists, skip 
gluten config")
+    } else {
+      SparkSession.cleanupAnyExistingSession()
+      val spark = SparkSession.builder()
+        .master("local[1]")
+        .config(SoftAffinityConstants.PARAMS_KEY_SOFT_AFFINITY_ENABLED, "true")
+        .config("spark.sql.shuffle.partitions", "1")
+        .config("spark.sql.adaptive.enabled", "false")
+        .config("spark.plugins", "org.apache.gluten.GlutenPlugin")
+        .config("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
+        .config("spark.io.compression.codec", "LZ4")
+        .config("spark.gluten.sql.columnar.libpath", chLibPath)
+        .config("spark.gluten.sql.enable.native.validation", "false")
+        .config("spark.memory.offHeap.enabled", "true")
+        .config("spark.memory.offHeap.size", "2G")
+        .config("spark.gluten.sql.columnar.extended.columnar.pre.rules",
+          
"org.apache.spark.sql.execution.gluten.ConvertKylinFileSourceToGlutenRule")
+        .withExtensions { ext =>
+          ext.injectPlannerStrategy(_ => KylinSourceStrategy)
+          ext.injectPlannerStrategy(_ => LayoutFileSourceStrategy)
+          ext.injectPlannerStrategy(_ => new KylinDeltaSourceStrategy)
+        }
+        .getOrCreate()
+
+      withTempPath { path =>
+        val tempDir = path.getCanonicalPath
+        val df = createSimpleFileDeltaDF(spark, tempDir)
+        val transformed = 
HeuristicTransform.static()(df.queryExecution.executedPlan)
+        val res = transformed.collect {
+          case p: KylinStorageScanExecTransformer => p
+        }
+        assertResult(1)(res.size)
+      }
+
+      spark.sparkContext.stop()
+    }
+  }
+
   test("Create sharding read RDD without Soft affinity - FileScanRDD") {
     withTempPath { path =>
       SparkSession.cleanupAnyExistingSession()

Reply via email to