This is an automated email from the ASF dual-hosted git repository.
caolu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin5 by this push:
new d0433ae8cb KYLIN-6063 Fix KylinStorageScanExec fallback
d0433ae8cb is described below
commit d0433ae8cbeb1860592c38fc9cbaa42303342558
Author: Shuai li <[email protected]>
AuthorDate: Tue Jan 7 10:13:07 2025 +0800
KYLIN-6063 Fix KylinStorageScanExec fallback
---
.../apache/kylin/gluten/GlutenKEConfigTest.java | 11 +++++
.../apache/kylin/util/QueryResultComparator.java | 2 +-
.../query/sql_window/new_sql_window/query20.sql | 10 ++---
.../test/resources/query/sql_window/query01.sql | 4 +-
.../test/resources/query/sql_window/query02.sql | 3 +-
.../test/resources/query/sql_window/query03.sql | 7 +--
.../test/resources/query/sql_window/query04.sql | 3 +-
.../test/resources/query/sql_window/query05.sql | 1 +
.../test/resources/query/sql_window/query06.sql | 1 +
.../test/resources/query/sql_window/query07.sql | 1 +
.../test/resources/query/sql_window/query08.sql | 1 +
.../query/sql_window/query08.sql.expected | 1 +
.../test/resources/query/sql_window/query09.sql | 1 +
.../query/sql_window/query09.sql.expected | 1 +
.../test/resources/query/sql_window/query12.sql | 1 +
.../query/sql_window/query12.sql.expected | 1 +
.../test/resources/query/sql_window/query13.sql | 1 +
.../spark/NLocalWithSparkSessionTestBase.java | 2 +
src/spark-project/kylin-internal-catalog/pom.xml | 6 +++
.../scala/org/apache/spark/sql/SparderEnv.scala | 2 +-
src/spark-project/spark-common/pom.xml | 6 +++
.../datasource/KylinDeltaSourceStrategy.scala | 2 +-
.../ConvertKylinFileSourceToGlutenRule.scala | 16 ++++++-
.../gluten/KylinStorageScanExecTransformer.scala | 47 ++++++++++++++++++++
.../apache/spark/sql/common/GlutenTestConfig.scala | 14 +++++-
.../execution/KylinFileSourceScanExecSuite.scala | 51 ++++++++++++++++++++--
26 files changed, 176 insertions(+), 20 deletions(-)
diff --git
a/src/kylin-it/src/test/java/org/apache/kylin/gluten/GlutenKEConfigTest.java
b/src/kylin-it/src/test/java/org/apache/kylin/gluten/GlutenKEConfigTest.java
index bb414d2323..400b99c418 100644
--- a/src/kylin-it/src/test/java/org/apache/kylin/gluten/GlutenKEConfigTest.java
+++ b/src/kylin-it/src/test/java/org/apache/kylin/gluten/GlutenKEConfigTest.java
@@ -18,10 +18,12 @@
package org.apache.kylin.gluten;
+import org.apache.gluten.execution.FilterExecTransformer;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.query.engine.QueryExec;
import org.apache.kylin.rec.util.AccelerationUtil;
import org.apache.kylin.util.SuggestTestBase;
+import org.apache.spark.sql.common.GlutenTestConfig;
import org.junit.Assert;
import org.junit.Test;
@@ -41,6 +43,15 @@ public class GlutenKEConfigTest extends SuggestTestBase {
Assert.assertTrue(resultSet.getSize() > 0);
}
+ @Test
+ public void testGlutenIsEnable() throws Exception {
+ if (!GlutenTestConfig.enableGluten()) {
+ return;
+ }
+ val a = ss.range(100).filter("id < 5").queryExecution().executedPlan();
+
Assert.assertTrue(a.find(FilterExecTransformer.class::isInstance).nonEmpty());
+ }
+
private void proposeAndBuildIndex(String[] sqls) throws
InterruptedException {
AccelerationUtil.runWithSmartContext(KylinConfig.getInstanceFromEnv(),
PROJECT, sqls, true);
buildAllModels(KylinConfig.getInstanceFromEnv(), PROJECT);
diff --git
a/src/kylin-it/src/test/java/org/apache/kylin/util/QueryResultComparator.java
b/src/kylin-it/src/test/java/org/apache/kylin/util/QueryResultComparator.java
index 4fe6a5840b..0c4c6f4d74 100644
---
a/src/kylin-it/src/test/java/org/apache/kylin/util/QueryResultComparator.java
+++
b/src/kylin-it/src/test/java/org/apache/kylin/util/QueryResultComparator.java
@@ -129,7 +129,7 @@ public class QueryResultComparator {
private static void printRows(String source, List<String> rows) {
log.info("***********" + source + " start, only show top 100
result**********");
- rows.stream().limit(100).forEach(log::info);
+ rows.stream().limit(11000).forEach(log::info);
log.info("***********" + source + " end**********");
}
}
diff --git
a/src/kylin-it/src/test/resources/query/sql_window/new_sql_window/query20.sql
b/src/kylin-it/src/test/resources/query/sql_window/new_sql_window/query20.sql
index bec5dcabaf..91566ae288 100644
---
a/src/kylin-it/src/test/resources/query/sql_window/new_sql_window/query20.sql
+++
b/src/kylin-it/src/test/resources/query/sql_window/new_sql_window/query20.sql
@@ -22,9 +22,9 @@
-- OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
--
select
-lag(cal_dt,50,date'2019-01-01') over (partition by seller_id order by
item_count)
-,lag(cal_dt,50,'2019-01-01') over (partition by seller_id order by item_count)
-,lead(item_count,1,'2019') over (partition by seller_id order by cal_dt)
-,lead(item_count,1,2019) over (partition by seller_id order by cal_dt)
-,lead(price,1,1.234) over (partition by seller_id order by cal_dt)
+lag(cal_dt,50,date'2019-01-01') over (partition by seller_id order by
item_count,order_id)
+,lag(cal_dt,50,'2019-01-01') over (partition by seller_id order by
item_count,order_id)
+,lead(item_count,1,'2019') over (partition by seller_id order by
cal_dt,order_id)
+,lead(item_count,1,2019) over (partition by seller_id order by cal_dt,order_id)
+,lead(price,1,1.234) over (partition by seller_id order by cal_dt,order_id)
from test_kylin_fact
\ No newline at end of file
diff --git a/src/kylin-it/src/test/resources/query/sql_window/query01.sql
b/src/kylin-it/src/test/resources/query/sql_window/query01.sql
index a41bad7a67..f309664061 100644
--- a/src/kylin-it/src/test/resources/query/sql_window/query01.sql
+++ b/src/kylin-it/src/test/resources/query/sql_window/query01.sql
@@ -16,6 +16,8 @@
-- limitations under the License.
--
-select lstg_format_name, sum(price) as GMV, count(lstg_format_name)
over(partition by lstg_format_name order by cal_dt, lstg_format_name)
+select lstg_format_name, sum(price) as GMV,
+ count(lstg_format_name) over(partition by lstg_format_name order by
cal_dt) cnt
from test_kylin_fact
group by cal_dt, lstg_format_name
+order by lstg_format_name, GMV, cnt
diff --git a/src/kylin-it/src/test/resources/query/sql_window/query02.sql
b/src/kylin-it/src/test/resources/query/sql_window/query02.sql
index dd00ce7f71..3ac915e061 100644
--- a/src/kylin-it/src/test/resources/query/sql_window/query02.sql
+++ b/src/kylin-it/src/test/resources/query/sql_window/query02.sql
@@ -16,6 +16,7 @@
-- limitations under the License.
--
-select lstg_format_name,round(avg(sum(price)) over(partition by
lstg_format_name order by cal_dt, lstg_format_name),0)
+select lstg_format_name,round(avg(sum(price)) over(partition by
lstg_format_name order by cal_dt, lstg_format_name),0) rd
from test_kylin_fact
group by cal_dt, lstg_format_name
+order by lstg_format_name,rd
diff --git a/src/kylin-it/src/test/resources/query/sql_window/query03.sql
b/src/kylin-it/src/test/resources/query/sql_window/query03.sql
index b9e38362cc..2337aa3cf8 100644
--- a/src/kylin-it/src/test/resources/query/sql_window/query03.sql
+++ b/src/kylin-it/src/test/resources/query/sql_window/query03.sql
@@ -17,8 +17,9 @@
--
select lstg_format_name,
-sum(sum(price)) over(partition by lstg_format_name order by cal_dt,
lstg_format_name),
-max(sum(price)) over(partition by lstg_format_name order by cal_dt,
lstg_format_name),
-min(sum(price)) over(partition by lstg_format_name order by cal_dt,
lstg_format_name)
+sum(sum(price)) over(partition by lstg_format_name order by cal_dt,
lstg_format_name) sp,
+max(sum(price)) over(partition by lstg_format_name order by cal_dt,
lstg_format_name) mp,
+min(sum(price)) over(partition by lstg_format_name order by cal_dt,
lstg_format_name) mnp
from test_kylin_fact
group by cal_dt, lstg_format_name
+order by lstg_format_name,sp,mp,mnp
diff --git a/src/kylin-it/src/test/resources/query/sql_window/query04.sql
b/src/kylin-it/src/test/resources/query/sql_window/query04.sql
index 648b126767..1917a779bd 100644
--- a/src/kylin-it/src/test/resources/query/sql_window/query04.sql
+++ b/src/kylin-it/src/test/resources/query/sql_window/query04.sql
@@ -16,8 +16,9 @@
-- limitations under the License.
--
-select cal_dt, lstg_format_name, sum(price),
+select cal_dt, lstg_format_name, sum(price) sp,
rank() over(partition by lstg_format_name order by cal_dt, lstg_format_name)
as "RANK",
dense_rank() over(partition by lstg_format_name order by cal_dt,
lstg_format_name) as "DENSE_RANK"
from test_kylin_fact
group by cal_dt, lstg_format_name
+order by cal_dt,lstg_format_name,sp,"RANK","DENSE_RANK"
diff --git a/src/kylin-it/src/test/resources/query/sql_window/query05.sql
b/src/kylin-it/src/test/resources/query/sql_window/query05.sql
index 41fa43e564..7df37ba3f8 100644
--- a/src/kylin-it/src/test/resources/query/sql_window/query05.sql
+++ b/src/kylin-it/src/test/resources/query/sql_window/query05.sql
@@ -25,3 +25,4 @@ ntile(4) over (partition by lstg_format_name order by cal_dt)
as "quarter"
from test_kylin_fact
where cal_dt < '2012-02-01'
group by cal_dt, lstg_format_name
+order by cal_dt, lstg_format_name,"first","current","prev","next","quarter"
\ No newline at end of file
diff --git a/src/kylin-it/src/test/resources/query/sql_window/query06.sql
b/src/kylin-it/src/test/resources/query/sql_window/query06.sql
index 6f9c6877ba..637f88c644 100644
--- a/src/kylin-it/src/test/resources/query/sql_window/query06.sql
+++ b/src/kylin-it/src/test/resources/query/sql_window/query06.sql
@@ -22,3 +22,4 @@ when 0.0 then 0 else sum(price)/lag(sum(price))
over(partition by lstg_format_na
from test_kylin_fact
where cal_dt < '2012-02-01'
group by cal_dt, lstg_format_name
+order by cal_dt, lstg_format_name, GMV, "prev"
diff --git a/src/kylin-it/src/test/resources/query/sql_window/query07.sql
b/src/kylin-it/src/test/resources/query/sql_window/query07.sql
index eef5a37594..a1d538b414 100644
--- a/src/kylin-it/src/test/resources/query/sql_window/query07.sql
+++ b/src/kylin-it/src/test/resources/query/sql_window/query07.sql
@@ -22,3 +22,4 @@ last_value(sum(price)) over (partition by lstg_format_name
order by cal_dt rows
from test_kylin_fact
where cal_dt < '2012-02-01'
group by cal_dt, lstg_format_name
+order by cal_dt, lstg_format_name, GMV, "prev 2 rows", "next 2 rows"
diff --git a/src/kylin-it/src/test/resources/query/sql_window/query08.sql
b/src/kylin-it/src/test/resources/query/sql_window/query08.sql
index e239e87b72..b2615b2c41 100644
--- a/src/kylin-it/src/test/resources/query/sql_window/query08.sql
+++ b/src/kylin-it/src/test/resources/query/sql_window/query08.sql
@@ -22,3 +22,4 @@ last_value(sum(price)) over (partition by lstg_format_name
order by cast(cal_dt
from test_kylin_fact
where cal_dt < '2012-02-01'
group by cal_dt, lstg_format_name
+order by cal_dt, lstg_format_name, GMV, "prev 3 days", "next 3 days"
diff --git
a/src/kylin-it/src/test/resources/query/sql_window/query08.sql.expected
b/src/kylin-it/src/test/resources/query/sql_window/query08.sql.expected
index 62bf7bd9b7..0051ef33fb 100644
--- a/src/kylin-it/src/test/resources/query/sql_window/query08.sql.expected
+++ b/src/kylin-it/src/test/resources/query/sql_window/query08.sql.expected
@@ -21,3 +21,4 @@ last_value(sum(price)) over (partition by lstg_format_name
order by cast(cast(ca
from test_kylin_fact
where cal_dt < '2012-02-01'
group by cal_dt, lstg_format_name
+order by cal_dt, lstg_format_name, GMV, "prev 3 days", "next 3 days"
diff --git a/src/kylin-it/src/test/resources/query/sql_window/query09.sql
b/src/kylin-it/src/test/resources/query/sql_window/query09.sql
index c28318d180..a4c304c111 100644
--- a/src/kylin-it/src/test/resources/query/sql_window/query09.sql
+++ b/src/kylin-it/src/test/resources/query/sql_window/query09.sql
@@ -25,3 +25,4 @@ select * from (
group by cal_dt, lstg_format_name
)t
where cal_dt between '2013-01-06' and '2013-01-15'
+order by cal_dt, lstg_format_name, GMV, "last_day", "last_year"
diff --git
a/src/kylin-it/src/test/resources/query/sql_window/query09.sql.expected
b/src/kylin-it/src/test/resources/query/sql_window/query09.sql.expected
index 0fe1d6bba3..c9e4985342 100644
--- a/src/kylin-it/src/test/resources/query/sql_window/query09.sql.expected
+++ b/src/kylin-it/src/test/resources/query/sql_window/query09.sql.expected
@@ -27,3 +27,4 @@ with tmptb as (select cal_dt, lstg_format_name, sum(price) as
GMV,
select * from
tmptb
where cal_dt between '2013-01-06' and '2013-01-15'
+order by cal_dt, lstg_format_name, GMV, "last_day", "last_year"
diff --git a/src/kylin-it/src/test/resources/query/sql_window/query12.sql
b/src/kylin-it/src/test/resources/query/sql_window/query12.sql
index b9f23ead11..7e795ee055 100644
--- a/src/kylin-it/src/test/resources/query/sql_window/query12.sql
+++ b/src/kylin-it/src/test/resources/query/sql_window/query12.sql
@@ -24,3 +24,4 @@ select * from(
group by cal_dt, lstg_format_name,SLR_SEGMENT_CD
)t
where cal_dt between '2013-01-06' and '2013-01-15'
+order by cal_dt, lstg_format_name, GMV, "last_day", "last_year"
diff --git
a/src/kylin-it/src/test/resources/query/sql_window/query12.sql.expected
b/src/kylin-it/src/test/resources/query/sql_window/query12.sql.expected
index 8fd51963bf..64ff8b47f9 100644
--- a/src/kylin-it/src/test/resources/query/sql_window/query12.sql.expected
+++ b/src/kylin-it/src/test/resources/query/sql_window/query12.sql.expected
@@ -24,3 +24,4 @@ select * from(
group by cal_dt, lstg_format_name,SLR_SEGMENT_CD
)t
where cal_dt between '2013-01-06' and '2013-01-15'
+order by cal_dt, lstg_format_name, GMV, "last_day", "last_year"
diff --git a/src/kylin-it/src/test/resources/query/sql_window/query13.sql
b/src/kylin-it/src/test/resources/query/sql_window/query13.sql
index 9aff463519..621bd3673b 100644
--- a/src/kylin-it/src/test/resources/query/sql_window/query13.sql
+++ b/src/kylin-it/src/test/resources/query/sql_window/query13.sql
@@ -25,3 +25,4 @@ ntile(4) over (partition by lstg_format_name order by cal_dt)
as "quarter"
from test_kylin_fact
where cal_dt < '2012-02-01'
group by cal_dt, lstg_format_name
+order by cal_dt, lstg_format_name,GMV,"first","current","prev","next","quarter"
diff --git
a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/NLocalWithSparkSessionTestBase.java
b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/NLocalWithSparkSessionTestBase.java
index f90f597f25..4baac3673a 100644
---
a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/NLocalWithSparkSessionTestBase.java
+++
b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/NLocalWithSparkSessionTestBase.java
@@ -47,6 +47,7 @@ import org.apache.spark.sql.SparderEnv;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.optimizer.ConvertInnerJoinToSemiJoin;
import org.apache.spark.sql.common.GlutenTestConfig;
+import org.apache.spark.sql.execution.datasource.KylinDeltaSourceStrategy;
import org.apache.spark.sql.internal.StaticSQLConf;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
@@ -123,6 +124,7 @@ public class NLocalWithSparkSessionTestBase extends
NLocalFileMetadataTestCase i
cleanupAnyExistingSession();
ss = SparkSession.builder().withExtensions(ext -> {
ext.injectOptimizerRule(ss -> new ConvertInnerJoinToSemiJoin());
+ ext.injectPlannerStrategy(ss -> new KylinDeltaSourceStrategy());
return null;
}).config(sparkConf).getOrCreate();
SparderEnv.setSparkSession(ss);
diff --git a/src/spark-project/kylin-internal-catalog/pom.xml
b/src/spark-project/kylin-internal-catalog/pom.xml
index bc8387d71a..f67ee74c4e 100644
--- a/src/spark-project/kylin-internal-catalog/pom.xml
+++ b/src/spark-project/kylin-internal-catalog/pom.xml
@@ -79,6 +79,12 @@
</exclusions>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.gluten</groupId>
+ <artifactId>gluten-iceberg</artifactId>
+ <version>${gluten.version}</version>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_2.12</artifactId>
diff --git
a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala
b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala
index 01a16902c3..ef18bc8700 100644
---
a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala
+++
b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala
@@ -308,7 +308,7 @@ object SparderEnv extends Logging {
def injectExtensions(sse: SparkSessionExtensions): Unit = {
sse.injectPlannerStrategy(_ => KylinSourceStrategy)
sse.injectPlannerStrategy(_ => LayoutFileSourceStrategy)
- sse.injectPlannerStrategy(_ => KylinDeltaSourceStrategy)
+ sse.injectPlannerStrategy(_ => new KylinDeltaSourceStrategy)
sse.injectPostHocResolutionRule(HiveStorageRule)
sse.injectOptimizerRule(_ => new ConvertInnerJoinToSemiJoin())
if (KapConfig.getInstanceFromEnv.isConstraintPropagationEnabled) {
diff --git a/src/spark-project/spark-common/pom.xml
b/src/spark-project/spark-common/pom.xml
index 7e3e0b738c..deb32d34d0 100644
--- a/src/spark-project/spark-common/pom.xml
+++ b/src/spark-project/spark-common/pom.xml
@@ -193,6 +193,12 @@
<version>${gluten.version}</version>
<scope>${gluten.deps.scope}</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.gluten</groupId>
+ <artifactId>gluten-iceberg</artifactId>
+ <version>${gluten.version}</version>
+ <scope>${gluten.deps.scope}</scope>
+ </dependency>
<dependency>
<groupId>org.apache.gluten</groupId>
<artifactId>spark-sql-columnar-shims-kyspark</artifactId>
diff --git
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/KylinDeltaSourceStrategy.scala
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/KylinDeltaSourceStrategy.scala
index 67892f2912..02b6d21d13 100644
---
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/KylinDeltaSourceStrategy.scala
+++
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/KylinDeltaSourceStrategy.scala
@@ -56,7 +56,7 @@ import org.apache.spark.util.collection.BitSet
* is under the threshold with the addition of the next file, add it. If
not, open a new bucket
* and add it. Proceed to the next file.
*/
-object KylinDeltaSourceStrategy extends Strategy with PredicateHelper with
Logging {
+class KylinDeltaSourceStrategy extends Strategy with PredicateHelper with
Logging {
// should prune buckets iff num buckets is greater than 1 and there is only
one bucket column
private def shouldPruneBuckets(bucketSpec: Option[BucketSpec]): Boolean = {
diff --git
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/gluten/ConvertKylinFileSourceToGlutenRule.scala
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/gluten/ConvertKylinFileSourceToGlutenRule.scala
index e1b59b8831..d122f945dc 100644
---
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/gluten/ConvertKylinFileSourceToGlutenRule.scala
+++
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/gluten/ConvertKylinFileSourceToGlutenRule.scala
@@ -22,7 +22,7 @@ import
org.apache.gluten.execution.{FileSourceScanExecTransformer, GlutenPlan, V
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.utils.PushDownUtil
-import org.apache.spark.sql.execution.{KylinFileSourceScanExec,
LayoutFileSourceScanExec, SparkPlan}
+import org.apache.spark.sql.execution.{KylinFileSourceScanExec,
KylinStorageScanExec, LayoutFileSourceScanExec, SparkPlan}
class ConvertKylinFileSourceToGlutenRule(val session: SparkSession) extends
Rule[SparkPlan] {
@@ -72,5 +72,19 @@ class ConvertKylinFileSourceToGlutenRule(val session:
SparkSession) extends Rule
)
// Transformer validate
tryReturnGlutenPlan(transformer, l)
+ case v3Scan: KylinStorageScanExec =>
+ // convert to Gluten transformer
+ val transformer = new KylinStorageScanExecTransformer(
+ v3Scan.relation,
+ v3Scan.output,
+ v3Scan.requiredSchema,
+ v3Scan.partitionFilters,
+ v3Scan.optionalBucketSet,
+ v3Scan.optionalNumCoalescedBuckets,
+ PushDownUtil.removeNotSupportPushDownFilters(v3Scan.conf,
v3Scan.output, v3Scan.dataFilters),
+ v3Scan.tableIdentifier,
+ v3Scan.disableBucketedScan)
+
+ tryReturnGlutenPlan(transformer, v3Scan)
}
}
diff --git
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/gluten/KylinStorageScanExecTransformer.scala
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/gluten/KylinStorageScanExecTransformer.scala
new file mode 100644
index 0000000000..1677309fd9
--- /dev/null
+++
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/gluten/KylinStorageScanExecTransformer.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.gluten
+
+import org.apache.gluten.execution.FileSourceScanExecTransformer
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.execution.datasources.HadoopFsRelation
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.collection.BitSet
+
+class KylinStorageScanExecTransformer(@transient relation: HadoopFsRelation,
+ output: Seq[Attribute],
+ requiredSchema: StructType,
+ partitionFilters: Seq[Expression],
+ optionalBucketSet: Option[BitSet],
+ optionalNumCoalescedBuckets: Option[Int],
+ dataFilters: Seq[Expression],
+ tableIdentifier: Option[TableIdentifier],
+ disableBucketedScan: Boolean = false)
extends FileSourceScanExecTransformer(
+ relation,
+ output,
+ requiredSchema,
+ partitionFilters,
+ optionalBucketSet,
+ optionalNumCoalescedBuckets,
+ dataFilters,
+ tableIdentifier,
+ disableBucketedScan) {
+
+}
diff --git
a/src/spark-project/spark-common/src/test/java/org/apache/spark/sql/common/GlutenTestConfig.scala
b/src/spark-project/spark-common/src/test/java/org/apache/spark/sql/common/GlutenTestConfig.scala
index 47bc43014b..36d7378760 100644
---
a/src/spark-project/spark-common/src/test/java/org/apache/spark/sql/common/GlutenTestConfig.scala
+++
b/src/spark-project/spark-common/src/test/java/org/apache/spark/sql/common/GlutenTestConfig.scala
@@ -27,12 +27,22 @@ object GlutenTestConfig extends Logging {
private val GLUTEN_CH_LIB_PATH_KEY = "clickhouse.lib.path"
- def configGluten(conf: SparkConf): Unit = {
+ def enableGluten(): Boolean = {
val chLibPath = System.getProperty(GLUTEN_CH_LIB_PATH_KEY)
if (StringUtils.isEmpty(chLibPath) || !new File(chLibPath).exists) {
log.warn("-Dclickhouse.lib.path is not set or path not exists, skip
gluten config")
- return // skip
+ false // skip
+ } else {
+ true
}
+ }
+
+ def configGluten(conf: SparkConf): Unit = {
+ if (!enableGluten()) {
+ return
+ }
+
+ val chLibPath = System.getProperty(GLUTEN_CH_LIB_PATH_KEY)
conf.set("spark.gluten.enabled", "true")
conf.set("spark.plugins", "org.apache.gluten.GlutenPlugin")
conf.set("spark.gluten.sql.columnar.libpath", chLibPath)
diff --git
a/src/spark-project/spark-common/src/test/scala/org/apache/spark/sql/execution/KylinFileSourceScanExecSuite.scala
b/src/spark-project/spark-common/src/test/scala/org/apache/spark/sql/execution/KylinFileSourceScanExecSuite.scala
index 970ff99076..af601c371b 100644
---
a/src/spark-project/spark-common/src/test/scala/org/apache/spark/sql/execution/KylinFileSourceScanExecSuite.scala
+++
b/src/spark-project/spark-common/src/test/scala/org/apache/spark/sql/execution/KylinFileSourceScanExecSuite.scala
@@ -17,8 +17,11 @@
package org.apache.spark.sql.execution
+import java.io.File
import java.util.concurrent.TimeUnit
+import org.apache.commons.lang3.StringUtils
+import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform
import org.apache.hadoop.fs.Path
import org.apache.kylin.cache.softaffinity.SoftAffinityConstants
import org.apache.spark.SparkFunSuite
@@ -31,8 +34,9 @@ import org.apache.spark.sql.common.LocalMetadata
import org.apache.spark.sql.delta.KylinDeltaLogFileIndex
import org.apache.spark.sql.delta.actions.AddFile
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
-import org.apache.spark.sql.execution.datasource.{FilePruner,
KylinDeltaSourceStrategy, KylinSourceStrategy, LayoutFileSourceStrategy}
-import org.apache.spark.sql.execution.datasources.{CacheFileScanRDD,
FileIndex, FileScanRDD, HadoopFsRelation, LogicalRelation, PartitionDirectory}
+import org.apache.spark.sql.execution.datasource._
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.gluten.KylinStorageScanExecTransformer
import org.mockito.{ArgumentMatchers, Mockito}
import com.google.common.cache.CacheBuilder
@@ -56,7 +60,7 @@ class KylinFileSourceScanExecSuite extends SparkFunSuite
.withExtensions { ext =>
ext.injectPlannerStrategy(_ => KylinSourceStrategy)
ext.injectPlannerStrategy(_ => LayoutFileSourceStrategy)
- ext.injectPlannerStrategy(_ => KylinDeltaSourceStrategy)
+ ext.injectPlannerStrategy(_ => new KylinDeltaSourceStrategy)
}
.getOrCreate()
@@ -80,6 +84,47 @@ class KylinFileSourceScanExecSuite extends SparkFunSuite
spark.sparkContext.stop()
}
+ test("[Gluten] Create sharding read RDD with Soft affinity -
CacheFileScanRDD") {
+ val chLibPath = System.getProperty("clickhouse.lib.path")
+ if (StringUtils.isEmpty(chLibPath) || !new File(chLibPath).exists) {
+ log.warn("-Dclickhouse.lib.path is not set or path not exists, skip
gluten config")
+ } else {
+ SparkSession.cleanupAnyExistingSession()
+ val spark = SparkSession.builder()
+ .master("local[1]")
+ .config(SoftAffinityConstants.PARAMS_KEY_SOFT_AFFINITY_ENABLED, "true")
+ .config("spark.sql.shuffle.partitions", "1")
+ .config("spark.sql.adaptive.enabled", "false")
+ .config("spark.plugins", "org.apache.gluten.GlutenPlugin")
+ .config("spark.shuffle.manager",
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
+ .config("spark.io.compression.codec", "LZ4")
+ .config("spark.gluten.sql.columnar.libpath", chLibPath)
+ .config("spark.gluten.sql.enable.native.validation", "false")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "2G")
+ .config("spark.gluten.sql.columnar.extended.columnar.pre.rules",
+
"org.apache.spark.sql.execution.gluten.ConvertKylinFileSourceToGlutenRule")
+ .withExtensions { ext =>
+ ext.injectPlannerStrategy(_ => KylinSourceStrategy)
+ ext.injectPlannerStrategy(_ => LayoutFileSourceStrategy)
+ ext.injectPlannerStrategy(_ => new KylinDeltaSourceStrategy)
+ }
+ .getOrCreate()
+
+ withTempPath { path =>
+ val tempDir = path.getCanonicalPath
+ val df = createSimpleFileDeltaDF(spark, tempDir)
+ val transformed =
HeuristicTransform.static()(df.queryExecution.executedPlan)
+ val res = transformed.collect {
+ case p: KylinStorageScanExecTransformer => p
+ }
+ assertResult(1)(res.size)
+ }
+
+ spark.sparkContext.stop()
+ }
+ }
+
test("Create sharding read RDD without Soft affinity - FileScanRDD") {
withTempPath { path =>
SparkSession.cleanupAnyExistingSession()