[
https://issues.apache.org/jira/browse/SPARK-37595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
wang-zhun updated SPARK-37595:
------------------------------
Description:
The datasourcev2 table is very slow when executing TPCDS, because `exists ...
select *` will not push down the cropped columns to the data source
Add test in `org.apache.spark.sql.connector.DataSourceV2SQLSuite`
{code:java}
test("datasourcev2 exists") {
val t1 = s"${catalogAndNamespace}t1"
withTable(t1) {
sql(s"CREATE TABLE $t1 (col1 string, col2 string) USING $v2Format")
val t2 = s"${catalogAndNamespace}t2"
withTable(t2) {
sql(s"CREATE TABLE $t2 (col1 string, col2 string) USING $v2Format")
val query = sql(s"select * from $t1 where not exists" +
s"(select * from $t2 where t1.col1=t2.col1)").queryExecution
// scalastyle:off println
println(query.executedPlan)
}
}
}
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [col1#17], [col1#19], LeftSemi, BuildRight, false
:- Project [col1#17, col2#18]
: +- BatchScan[col1#17, col2#18] class
org.apache.spark.sql.connector.catalog.InMemoryTable$InMemoryBatchScan
RuntimeFilters: []
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
true]),false), [id=#28]
+- Project [col1#19]
+- BatchScan[col1#19, col2#20] class
org.apache.spark.sql.connector.catalog.InMemoryTable$InMemoryBatchScan
RuntimeFilters: []
Expectation is `BatchScan[col1#19] class
org.apache.spark.sql.connector.catalog.InMemoryTable$InMemoryBatchScan
RuntimeFilters: []` {code}
Reason `Batch("Early Filter and Projection Push-Down" V2ScanRelationPushDown`
is executed before `Batch("RewriteSubquery"`, parallel datasourceV2 does not
support `FileSourceStrategy`
was:
The datasourcev2 table is very slow when executing TPCDS, because `exists ...
select *` will not push down the cropped columns to the data source
Add test in `org.apache.spark.sql.connector.DataSourceV2SQLSuite`
{code:java}
test("datasourcev2 exists") {
val t1 = s"${catalogAndNamespace}t1"
withTable(t1) {
sql(s"CREATE TABLE $t1 (col1 string, col2 string) USING $v2Format")
val t2 = s"${catalogAndNamespace}t2"
withTable(t2) {
sql(s"CREATE TABLE $t2 (col1 string, col2 string) USING $v2Format")
val query = sql(s"select * from $t1 where not exists" +
s"(select * from $t2 where t1.col1=t2.col1)").queryExecution
// scalastyle:off println
println(query.executedPlan)
}
}
}AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [col1#17], [col1#19], LeftSemi, BuildRight, false
:- Project [col1#17, col2#18]
: +- BatchScan[col1#17, col2#18] class
org.apache.spark.sql.connector.catalog.InMemoryTable$InMemoryBatchScan
RuntimeFilters: []
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
true]),false), [id=#28]
+- Project [col1#19]
+- BatchScan[col1#19, col2#20] class
org.apache.spark.sql.connector.catalog.InMemoryTable$InMemoryBatchScan
RuntimeFilters: []
Expectation is `BatchScan[col1#19] class
org.apache.spark.sql.connector.catalog.InMemoryTable$InMemoryBatchScan
RuntimeFilters: []` {code}
Reason `Batch("Early Filter and Projection Push-Down" V2ScanRelationPushDown`
is executed before `Batch("RewriteSubquery"`, parallel datasourceV2 does not
support `FileSourceStrategy`
> DatasourceV2 `exists ... select *` column push down
> ---------------------------------------------------
>
> Key: SPARK-37595
> URL: https://issues.apache.org/jira/browse/SPARK-37595
> Project: Spark
> Issue Type: Wish
> Components: SQL
> Affects Versions: 3.1.2, 3.2.0
> Reporter: wang-zhun
> Priority: Major
>
> The datasourcev2 table is very slow when executing TPCDS, because `exists ...
> select *` will not push down the cropped columns to the data source
>
> Add test in `org.apache.spark.sql.connector.DataSourceV2SQLSuite`
> {code:java}
> test("datasourcev2 exists") {
> val t1 = s"${catalogAndNamespace}t1"
> withTable(t1) {
> sql(s"CREATE TABLE $t1 (col1 string, col2 string) USING $v2Format")
> val t2 = s"${catalogAndNamespace}t2"
> withTable(t2) {
> sql(s"CREATE TABLE $t2 (col1 string, col2 string) USING $v2Format")
> val query = sql(s"select * from $t1 where not exists" +
> s"(select * from $t2 where t1.col1=t2.col1)").queryExecution
> // scalastyle:off println
> println(query.executedPlan)
> }
> }
> }
> AdaptiveSparkPlan isFinalPlan=false
> +- BroadcastHashJoin [col1#17], [col1#19], LeftSemi, BuildRight, false
> :- Project [col1#17, col2#18]
> : +- BatchScan[col1#17, col2#18] class
> org.apache.spark.sql.connector.catalog.InMemoryTable$InMemoryBatchScan
> RuntimeFilters: []
> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
> true]),false), [id=#28]
> +- Project [col1#19]
> +- BatchScan[col1#19, col2#20] class
> org.apache.spark.sql.connector.catalog.InMemoryTable$InMemoryBatchScan
> RuntimeFilters: []
> Expectation is `BatchScan[col1#19] class
> org.apache.spark.sql.connector.catalog.InMemoryTable$InMemoryBatchScan
> RuntimeFilters: []` {code}
> Reason `Batch("Early Filter and Projection Push-Down" V2ScanRelationPushDown`
> is executed before `Batch("RewriteSubquery"`, parallel datasourceV2 does not
> support `FileSourceStrategy`
--
This message was sent by Atlassian Jira
(v8.20.1#820001)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]