[ 
https://issues.apache.org/jira/browse/SPARK-25474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16908660#comment-16908660
 ] 

Yuming Wang commented on SPARK-25474:
-------------------------------------

More benchmark result about https://github.com/apache/spark/pull/24715:
{noformat}
scala> spark.sql("explain cost select * from test_non_partition_300000").show
Get size from relation: 706507984, time: 169
Get size fall back to HDFS: 706507984, time: 228
+--------------------+
|                plan|
+--------------------+
|== Optimized Logi...|
+--------------------+


scala> spark.sql("explain cost select * from test_non_partition_300000").show
Get size from relation: 706507984, time: 161
Get size fall back to HDFS: 706507984, time: 209
+--------------------+
|                plan|
+--------------------+
|== Optimized Logi...|
+--------------------+


scala> spark.sql("explain cost select * from test_non_partition_300000").show
Get size from relation: 706507984, time: 136
Get size fall back to HDFS: 706507984, time: 188
+--------------------+
|                plan|
+--------------------+
|== Optimized Logi...|
+--------------------+


scala> spark.sql("explain cost select * from test_non_partition_300000").show
Get size from relation: 706507984, time: 149
Get size fall back to HDFS: 706507984, time: 164
+--------------------+
|                plan|
+--------------------+
|== Optimized Logi...|
+--------------------+


scala> spark.sql("explain cost select * from test_non_partition_300000").show
Get size from relation: 706507984, time: 147
Get size fall back to HDFS: 706507984, time: 184
+--------------------+
|                plan|
+--------------------+
|== Optimized Logi...|
+--------------------+


scala> spark.sql("explain cost select * from test_non_partition_300000").show
Get size from relation: 706507984, time: 140
Get size fall back to HDFS: 706507984, time: 458
+--------------------+
|                plan|
+--------------------+
|== Optimized Logi...|
+--------------------+


scala> spark.sql("explain cost select * from test_non_partition_300000").show
Get size from relation: 706507984, time: 115
Get size fall back to HDFS: 706507984, time: 375
+--------------------+
|                plan|
+--------------------+
|== Optimized Logi...|
+--------------------+


scala> spark.sql("explain cost select * from test_non_partition_300000").show
Get size from relation: 706507984, time: 142
Get size fall back to HDFS: 706507984, time: 202
+--------------------+
|                plan|
+--------------------+
|== Optimized Logi...|
+--------------------+


scala> spark.sql("explain cost select * from test_non_partition_300000").show
Get size from relation: 706507984, time: 145
Get size fall back to HDFS: 706507984, time: 206
+--------------------+
|                plan|
+--------------------+
|== Optimized Logi...|
+--------------------+


scala> spark.sql("explain cost select * from test_non_partition_300000").show
Get size from relation: 706507984, time: 385
Get size fall back to HDFS: 706507984, time: 213
+--------------------+
|                plan|
+--------------------+
|== Optimized Logi...|
+--------------------+


scala> spark.sql("explain cost select * from test_non_partition_300000").show
Get size from relation: 706507984, time: 142
Get size fall back to HDFS: 706507984, time: 194
+--------------------+
|                plan|
+--------------------+
|== Optimized Logi...|
+--------------------+


scala> spark.sql("explain cost select * from test_non_partition_300000").show
Get size from relation: 706507984, time: 159
Get size fall back to HDFS: 706507984, time: 187
+--------------------+
|                plan|
+--------------------+
|== Optimized Logi...|
+--------------------+


scala> spark.sql("explain cost select * from test_non_partition_300000").show
Get size from relation: 706507984, time: 136
Get size fall back to HDFS: 706507984, time: 188
+--------------------+
|                plan|
+--------------------+
|== Optimized Logi...|
+--------------------+


scala> spark.sql("explain cost select * from test_non_partition_300000").show
Get size from relation: 706507984, time: 180
Get size fall back to HDFS: 706507984, time: 196
+--------------------+
|                plan|
+--------------------+
|== Optimized Logi...|
+--------------------+


scala> spark.sql("explain cost select * from test_non_partition_300000").show
Get size from relation: 706507984, time: 122
Get size fall back to HDFS: 706507984, time: 176
+--------------------+
|                plan|
+--------------------+
|== Optimized Logi...|
+--------------------+


scala> spark.sql("explain cost select * from test_non_partition_300000").show
Get size from relation: 706507984, time: 179
Get size fall back to HDFS: 706507984, time: 187
+--------------------+
|                plan|
+--------------------+
|== Optimized Logi...|
+--------------------+


scala> spark.sql("explain cost select * from test_non_partition_300000").show
Get size from relation: 706507984, time: 251
Get size fall back to HDFS: 706507984, time: 192
+--------------------+
|                plan|
+--------------------+
|== Optimized Logi...|
+--------------------+


scala> spark.sql("explain cost select * from test_non_partition_300000").show
Get size from relation: 706507984, time: 132
Get size fall back to HDFS: 706507984, time: 175
+--------------------+
|                plan|
+--------------------+
|== Optimized Logi...|
+--------------------+


scala> spark.sql("explain cost select * from test_non_partition_300000").show
Get size from relation: 706507984, time: 154
Get size fall back to HDFS: 706507984, time: 188
+--------------------+
|                plan|
+--------------------+
|== Optimized Logi...|
+--------------------+


scala> spark.sql("explain cost select * from test_non_partition_300000").show
Get size from relation: 706507984, time: 142
Get size fall back to HDFS: 706507984, time: 186
+--------------------+
|                plan|
+--------------------+
|== Optimized Logi...|
+--------------------+


scala> spark.sql("explain cost select * from test_non_partition_300000").show
Get size from relation: 706507984, time: 199
Get size fall back to HDFS: 706507984, time: 256
+--------------------+
|                plan|
+--------------------+
|== Optimized Logi...|
+--------------------+


scala> spark.sql("explain cost select * from test_non_partition_300000").show
Get size from relation: 706507984, time: 220
Get size fall back to HDFS: 706507984, time: 264
+--------------------+
|                plan|
+--------------------+
|== Optimized Logi...|
+--------------------+


scala> spark.sql("explain cost select * from test_non_partition_300000").show
Get size from relation: 706507984, time: 153
Get size fall back to HDFS: 706507984, time: 300
+--------------------+
|                plan|
+--------------------+
|== Optimized Logi...|
+--------------------+


scala> spark.sql("explain cost select * from test_non_partition_300000").show
Get size from relation: 706507984, time: 157
Get size fall back to HDFS: 706507984, time: 402
+--------------------+
|                plan|
+--------------------+
|== Optimized Logi...|
+--------------------+


scala> spark.sql("explain cost select * from test_non_partition_300000").show
Get size from relation: 706507984, time: 119
Get size fall back to HDFS: 706507984, time: 220
+--------------------+
|                plan|
+--------------------+
|== Optimized Logi...|
+--------------------+


scala> spark.sql("explain cost select * from test_non_partition_300000").show
Get size from relation: 706507984, time: 126
Get size fall back to HDFS: 706507984, time: 189
+--------------------+
|                plan|
+--------------------+
|== Optimized Logi...|
+--------------------+


scala> spark.sql("explain cost select * from test_non_partition_300000").show
Get size from relation: 706507984, time: 154
Get size fall back to HDFS: 706507984, time: 213
+--------------------+
|                plan|
+--------------------+
|== Optimized Logi...|
+--------------------+


scala> spark.sql("explain cost select * from test_non_partition_300000").show
Get size from relation: 706507984, time: 132
Get size fall back to HDFS: 706507984, time: 197
+--------------------+
|                plan|
+--------------------+
|== Optimized Logi...|
+--------------------+


scala> spark.sql("explain cost select * from test_non_partition_300000").show
Get size from relation: 706507984, time: 266
Get size fall back to HDFS: 706507984, time: 220
+--------------------+
|                plan|
+--------------------+
|== Optimized Logi...|
+--------------------+


scala> spark.sql("explain cost select * from test_non_partition_300000").show
Get size from relation: 706507984, time: 135
Get size fall back to HDFS: 706507984, time: 281
+--------------------+
|                plan|
+--------------------+
|== Optimized Logi...|
+--------------------+


scala> spark.sql("explain cost select * from test_non_partition_300000").show
Get size from relation: 706507984, time: 139
Get size fall back to HDFS: 706507984, time: 204
+--------------------+
|                plan|
+--------------------+
|== Optimized Logi...|
+--------------------+


scala> spark.sql("explain cost select * from test_non_partition_300000").show
Get size from relation: 706507984, time: 112
Get size fall back to HDFS: 706507984, time: 359
+--------------------+
|                plan|
+--------------------+
|== Optimized Logi...|
+--------------------+


scala> spark.sql("explain cost select * from test_non_partition_300000").show
Get size from relation: 706507984, time: 164
Get size fall back to HDFS: 706507984, time: 363
+--------------------+
|                plan|
+--------------------+
|== Optimized Logi...|
+--------------------+


scala> spark.sql("explain cost select * from test_non_partition_300000").show
Get size from relation: 706507984, time: 139
Get size fall back to HDFS: 706507984, time: 202
+--------------------+
|                plan|
+--------------------+
|== Optimized Logi...|
+--------------------+


scala> spark.sql("explain cost select * from test_non_partition_300000").show
Get size from relation: 706507984, time: 140
Get size fall back to HDFS: 706507984, time: 195
+--------------------+
|                plan|
+--------------------+
|== Optimized Logi...|
+--------------------+


scala> spark.sql("explain cost select * from test_non_partition_300000").show
Get size from relation: 706507984, time: 137
Get size fall back to HDFS: 706507984, time: 192
+--------------------+
|                plan|
+--------------------+
|== Optimized Logi...|
+--------------------+


scala> spark.sql("explain cost select * from test_non_partition_300000").show
Get size from relation: 706507984, time: 145
Get size fall back to HDFS: 706507984, time: 201
+--------------------+
|                plan|
+--------------------+
|== Optimized Logi...|
+--------------------+


scala> spark.sql("explain cost select * from test_non_partition_300000").show
Get size from relation: 706507984, time: 111
Get size fall back to HDFS: 706507984, time: 267
+--------------------+
|                plan|
+--------------------+
|== Optimized Logi...|
+--------------------+


scala> spark.sql("explain cost select * from test_non_partition_300000").show
Get size from relation: 706507984, time: 126
Get size fall back to HDFS: 706507984, time: 226
+--------------------+
|                plan|
+--------------------+
|== Optimized Logi...|
+--------------------+


{noformat}

Enable {{spark.sql.statistics.fallBackToHdfs}} for partition tables:
{noformat}
scala> spark.sql("set spark.sql.statistics.fallBackToHdfs")
res1: org.apache.spark.sql.DataFrame = [key: string, value: string]

scala> spark.sql("explain cost select * from test_partition_10000").show(false)
Get size from relation: 9223372036854775807, time: 0
Get size fall back to HDFS: 1036799332, time: 47
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|plan                                                                           
                                                                                
                                                                                
                                                                                
                                                                                
                                                     |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|== Optimized Logical Plan ==
Relation[id#15L,c3#16L,c2#17L] parquet, Statistics(sizeInBytes=988.8 MiB)

== Physical Plan ==
*(1) ColumnarToRow
+- FileScan parquet zeta_spark_dss.test_partition_10000[id#15L,c3#16L,c2#17L] 
Batched: true, DataFilters: [], Format: Parquet, Location: 
CatalogFileIndex[hdfs://hercules/sys/edw/zeta_spark_test_suite/dss/test_partition_10000],
 PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct<id:bigint,c3:bigint>

|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

{noformat}

> Support `spark.sql.statistics.fallBackToHdfs` in data source tables
> -------------------------------------------------------------------
>
>                 Key: SPARK-25474
>                 URL: https://issues.apache.org/jira/browse/SPARK-25474
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.3.1, 2.4.3
>         Environment: Spark 2.3.1
> Hadoop 2.7.2
>            Reporter: Ayush Anubhava
>            Assignee: shahid
>            Priority: Major
>             Fix For: 2.3.4, 2.4.4, 3.0.0
>
>
> *Description :* Size in bytes of the query is coming in EB in case of parquet 
> datasource. this would impact the performance , since join queries would 
> always go as Sort Merge Join.
> *Precondition :* spark.sql.statistics.fallBackToHdfs = true
> Steps:
> {code:java}
> 0: jdbc:hive2://10.xx:23040/default> create table t1110 (a int, b string) 
> using parquet PARTITIONED BY (b) ;
> +---------+--+
> | Result |
> +---------+--+
> +---------+--+
> 0: jdbc:hive2://10.1xx:23040/default> insert into t1110 values (2,'b');
> +---------+--+
> | Result |
> +---------+--+
> +---------+--+
> 0: jdbc:hive2://10.1xx:23040/default> insert into t1110 values (1,'a');
> +---------+--+
> | Result |
> +---------+--+
> +---------+--+
> 0: jdbc:hive2://10.xx.xx:23040/default> select * from t1110;
> +----+----+--+
> | a | b |
> +----+----+--+
> | 1 | a |
> | 2 | b |
> +----+----+--+
> {code}
> *{color:#d04437}Cost of the query shows sizeInBytes in EB{color}*
> {code:java}
>  explain cost select * from t1110;
> | == Optimized Logical Plan ==
> Relation[a#23,b#24] parquet, Statistics(sizeInBytes=8.0 EB, hints=none)
> == Physical Plan ==
> *(1) FileScan parquet open.t1110[a#23,b#24] Batched: true, Format: Parquet, 
> Location: 
> CatalogFileIndex[hdfs://hacluster/user/sparkhive/warehouse/open.db/t1110], 
> PartitionCount: 2, PartitionFilters: [], PushedFilters: [], ReadSchema: 
> struct<a:int> |
> {code}
> *{color:#d04437}This would lead to Sort Merge Join in case of join 
> query{color}*
> {code:java}
> 0: jdbc:hive2://10.xx.xx:23040/default> create table t110 (a int, b string) 
> using parquet PARTITIONED BY (b) ;
> +---------+--+
> | Result |
> +---------+--+
> +---------+--+
> 0: jdbc:hive2://10.xx.xx:23040/default> insert into t110 values (1,'a');
> +---------+--+
> | Result |
> +---------+--+
> +---------+--+
>  explain select * from t1110 t1 join t110 t2 on t1.a=t2.a;
> | == Physical Plan ==
> *(5) SortMergeJoin [a#23], [a#55], Inner
> :- *(2) Sort [a#23 ASC NULLS FIRST], false, 0
> : +- Exchange hashpartitioning(a#23, 200)
> : +- *(1) Project [a#23, b#24]
> : +- *(1) Filter isnotnull(a#23)
> : +- *(1) FileScan parquet open.t1110[a#23,b#24] Batched: true, Format: 
> Parquet, Location: 
> CatalogFileIndex[hdfs://hacluster/user/sparkhive/warehouse/open.db/t1110], 
> PartitionCount: 2, PartitionFilters: [], PushedFilters: [IsNotNull(a)], 
> ReadSchema: struct<a:int>
> +- *(4) Sort [a#55 ASC NULLS FIRST], false, 0
> +- Exchange hashpartitioning(a#55, 200)
> +- *(3) Project [a#55, b#56]
> +- *(3) Filter isnotnull(a#55)
> +- *(3) FileScan parquet open.t110[a#55,b#56] Batched: true, Format: Parquet, 
> Location: 
> CatalogFileIndex[hdfs://hacluster/user/sparkhive/warehouse/open.db/t110], 
> PartitionCount: 1, PartitionFilters: [], PushedFilters: [IsNotNull(a)], 
> ReadSchema: struct<a:int> |
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to