[ https://issues.apache.org/jira/browse/SPARK-25474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16908660#comment-16908660 ]
Yuming Wang commented on SPARK-25474: ------------------------------------- More benchmark result about https://github.com/apache/spark/pull/24715: {noformat} scala> spark.sql("explain cost select * from test_non_partition_300000").show Get size from relation: 706507984, time: 169 Get size fall back to HDFS: 706507984, time: 228 +--------------------+ | plan| +--------------------+ |== Optimized Logi...| +--------------------+ scala> spark.sql("explain cost select * from test_non_partition_300000").show Get size from relation: 706507984, time: 161 Get size fall back to HDFS: 706507984, time: 209 +--------------------+ | plan| +--------------------+ |== Optimized Logi...| +--------------------+ scala> spark.sql("explain cost select * from test_non_partition_300000").show Get size from relation: 706507984, time: 136 Get size fall back to HDFS: 706507984, time: 188 +--------------------+ | plan| +--------------------+ |== Optimized Logi...| +--------------------+ scala> spark.sql("explain cost select * from test_non_partition_300000").show Get size from relation: 706507984, time: 149 Get size fall back to HDFS: 706507984, time: 164 +--------------------+ | plan| +--------------------+ |== Optimized Logi...| +--------------------+ scala> spark.sql("explain cost select * from test_non_partition_300000").show Get size from relation: 706507984, time: 147 Get size fall back to HDFS: 706507984, time: 184 +--------------------+ | plan| +--------------------+ |== Optimized Logi...| +--------------------+ scala> spark.sql("explain cost select * from test_non_partition_300000").show Get size from relation: 706507984, time: 140 Get size fall back to HDFS: 706507984, time: 458 +--------------------+ | plan| +--------------------+ |== Optimized Logi...| +--------------------+ scala> spark.sql("explain cost select * from test_non_partition_300000").show Get size from relation: 706507984, time: 115 Get size fall back to HDFS: 706507984, time: 375 +--------------------+ | plan| +--------------------+ |== Optimized Logi...| +--------------------+ scala> spark.sql("explain cost select * from test_non_partition_300000").show Get size from relation: 706507984, time: 142 Get size fall back to HDFS: 706507984, time: 202 +--------------------+ | plan| +--------------------+ |== Optimized Logi...| +--------------------+ scala> spark.sql("explain cost select * from test_non_partition_300000").show Get size from relation: 706507984, time: 145 Get size fall back to HDFS: 706507984, time: 206 +--------------------+ | plan| +--------------------+ |== Optimized Logi...| +--------------------+ scala> spark.sql("explain cost select * from test_non_partition_300000").show Get size from relation: 706507984, time: 385 Get size fall back to HDFS: 706507984, time: 213 +--------------------+ | plan| +--------------------+ |== Optimized Logi...| +--------------------+ scala> spark.sql("explain cost select * from test_non_partition_300000").show Get size from relation: 706507984, time: 142 Get size fall back to HDFS: 706507984, time: 194 +--------------------+ | plan| +--------------------+ |== Optimized Logi...| +--------------------+ scala> spark.sql("explain cost select * from test_non_partition_300000").show Get size from relation: 706507984, time: 159 Get size fall back to HDFS: 706507984, time: 187 +--------------------+ | plan| +--------------------+ |== Optimized Logi...| +--------------------+ scala> spark.sql("explain cost select * from test_non_partition_300000").show Get size from relation: 706507984, time: 136 Get size fall back to HDFS: 706507984, time: 188 +--------------------+ | plan| +--------------------+ |== Optimized Logi...| +--------------------+ scala> spark.sql("explain cost select * from test_non_partition_300000").show Get size from relation: 706507984, time: 180 Get size fall back to HDFS: 706507984, time: 196 +--------------------+ | plan| +--------------------+ |== Optimized Logi...| +--------------------+ scala> spark.sql("explain cost select * from test_non_partition_300000").show Get size from relation: 706507984, time: 122 Get size fall back to HDFS: 706507984, time: 176 +--------------------+ | plan| +--------------------+ |== Optimized Logi...| +--------------------+ scala> spark.sql("explain cost select * from test_non_partition_300000").show Get size from relation: 706507984, time: 179 Get size fall back to HDFS: 706507984, time: 187 +--------------------+ | plan| +--------------------+ |== Optimized Logi...| +--------------------+ scala> spark.sql("explain cost select * from test_non_partition_300000").show Get size from relation: 706507984, time: 251 Get size fall back to HDFS: 706507984, time: 192 +--------------------+ | plan| +--------------------+ |== Optimized Logi...| +--------------------+ scala> spark.sql("explain cost select * from test_non_partition_300000").show Get size from relation: 706507984, time: 132 Get size fall back to HDFS: 706507984, time: 175 +--------------------+ | plan| +--------------------+ |== Optimized Logi...| +--------------------+ scala> spark.sql("explain cost select * from test_non_partition_300000").show Get size from relation: 706507984, time: 154 Get size fall back to HDFS: 706507984, time: 188 +--------------------+ | plan| +--------------------+ |== Optimized Logi...| +--------------------+ scala> spark.sql("explain cost select * from test_non_partition_300000").show Get size from relation: 706507984, time: 142 Get size fall back to HDFS: 706507984, time: 186 +--------------------+ | plan| +--------------------+ |== Optimized Logi...| +--------------------+ scala> spark.sql("explain cost select * from test_non_partition_300000").show Get size from relation: 706507984, time: 199 Get size fall back to HDFS: 706507984, time: 256 +--------------------+ | plan| +--------------------+ |== Optimized Logi...| +--------------------+ scala> spark.sql("explain cost select * from test_non_partition_300000").show Get size from relation: 706507984, time: 220 Get size fall back to HDFS: 706507984, time: 264 +--------------------+ | plan| +--------------------+ |== Optimized Logi...| +--------------------+ scala> spark.sql("explain cost select * from test_non_partition_300000").show Get size from relation: 706507984, time: 153 Get size fall back to HDFS: 706507984, time: 300 +--------------------+ | plan| +--------------------+ |== Optimized Logi...| +--------------------+ scala> spark.sql("explain cost select * from test_non_partition_300000").show Get size from relation: 706507984, time: 157 Get size fall back to HDFS: 706507984, time: 402 +--------------------+ | plan| +--------------------+ |== Optimized Logi...| +--------------------+ scala> spark.sql("explain cost select * from test_non_partition_300000").show Get size from relation: 706507984, time: 119 Get size fall back to HDFS: 706507984, time: 220 +--------------------+ | plan| +--------------------+ |== Optimized Logi...| +--------------------+ scala> spark.sql("explain cost select * from test_non_partition_300000").show Get size from relation: 706507984, time: 126 Get size fall back to HDFS: 706507984, time: 189 +--------------------+ | plan| +--------------------+ |== Optimized Logi...| +--------------------+ scala> spark.sql("explain cost select * from test_non_partition_300000").show Get size from relation: 706507984, time: 154 Get size fall back to HDFS: 706507984, time: 213 +--------------------+ | plan| +--------------------+ |== Optimized Logi...| +--------------------+ scala> spark.sql("explain cost select * from test_non_partition_300000").show Get size from relation: 706507984, time: 132 Get size fall back to HDFS: 706507984, time: 197 +--------------------+ | plan| +--------------------+ |== Optimized Logi...| +--------------------+ scala> spark.sql("explain cost select * from test_non_partition_300000").show Get size from relation: 706507984, time: 266 Get size fall back to HDFS: 706507984, time: 220 +--------------------+ | plan| +--------------------+ |== Optimized Logi...| +--------------------+ scala> spark.sql("explain cost select * from test_non_partition_300000").show Get size from relation: 706507984, time: 135 Get size fall back to HDFS: 706507984, time: 281 +--------------------+ | plan| +--------------------+ |== Optimized Logi...| +--------------------+ scala> spark.sql("explain cost select * from test_non_partition_300000").show Get size from relation: 706507984, time: 139 Get size fall back to HDFS: 706507984, time: 204 +--------------------+ | plan| +--------------------+ |== Optimized Logi...| +--------------------+ scala> spark.sql("explain cost select * from test_non_partition_300000").show Get size from relation: 706507984, time: 112 Get size fall back to HDFS: 706507984, time: 359 +--------------------+ | plan| +--------------------+ |== Optimized Logi...| +--------------------+ scala> spark.sql("explain cost select * from test_non_partition_300000").show Get size from relation: 706507984, time: 164 Get size fall back to HDFS: 706507984, time: 363 +--------------------+ | plan| +--------------------+ |== Optimized Logi...| +--------------------+ scala> spark.sql("explain cost select * from test_non_partition_300000").show Get size from relation: 706507984, time: 139 Get size fall back to HDFS: 706507984, time: 202 +--------------------+ | plan| +--------------------+ |== Optimized Logi...| +--------------------+ scala> spark.sql("explain cost select * from test_non_partition_300000").show Get size from relation: 706507984, time: 140 Get size fall back to HDFS: 706507984, time: 195 +--------------------+ | plan| +--------------------+ |== Optimized Logi...| +--------------------+ scala> spark.sql("explain cost select * from test_non_partition_300000").show Get size from relation: 706507984, time: 137 Get size fall back to HDFS: 706507984, time: 192 +--------------------+ | plan| +--------------------+ |== Optimized Logi...| +--------------------+ scala> spark.sql("explain cost select * from test_non_partition_300000").show Get size from relation: 706507984, time: 145 Get size fall back to HDFS: 706507984, time: 201 +--------------------+ | plan| +--------------------+ |== Optimized Logi...| +--------------------+ scala> spark.sql("explain cost select * from test_non_partition_300000").show Get size from relation: 706507984, time: 111 Get size fall back to HDFS: 706507984, time: 267 +--------------------+ | plan| +--------------------+ |== Optimized Logi...| +--------------------+ scala> spark.sql("explain cost select * from test_non_partition_300000").show Get size from relation: 706507984, time: 126 Get size fall back to HDFS: 706507984, time: 226 +--------------------+ | plan| +--------------------+ |== Optimized Logi...| +--------------------+ {noformat} Enable {{spark.sql.statistics.fallBackToHdfs}} for partition tables: {noformat} scala> spark.sql("set spark.sql.statistics.fallBackToHdfs") res1: org.apache.spark.sql.DataFrame = [key: string, value: string] scala> spark.sql("explain cost select * from test_partition_10000").show(false) Get size from relation: 9223372036854775807, time: 0 Get size fall back to HDFS: 1036799332, time: 47 +--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |plan | +--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |== Optimized Logical Plan == Relation[id#15L,c3#16L,c2#17L] parquet, Statistics(sizeInBytes=988.8 MiB) == Physical Plan == *(1) ColumnarToRow +- FileScan parquet zeta_spark_dss.test_partition_10000[id#15L,c3#16L,c2#17L] Batched: true, DataFilters: [], Format: Parquet, Location: CatalogFileIndex[hdfs://hercules/sys/edw/zeta_spark_test_suite/dss/test_partition_10000], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint,c3:bigint> | +--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ {noformat} > Support `spark.sql.statistics.fallBackToHdfs` in data source tables > ------------------------------------------------------------------- > > Key: SPARK-25474 > URL: https://issues.apache.org/jira/browse/SPARK-25474 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.3.1, 2.4.3 > Environment: Spark 2.3.1 > Hadoop 2.7.2 > Reporter: Ayush Anubhava > Assignee: shahid > Priority: Major > Fix For: 2.3.4, 2.4.4, 3.0.0 > > > *Description :* Size in bytes of the query is coming in EB in case of parquet > datasource. this would impact the performance , since join queries would > always go as Sort Merge Join. > *Precondition :* spark.sql.statistics.fallBackToHdfs = true > Steps: > {code:java} > 0: jdbc:hive2://10.xx:23040/default> create table t1110 (a int, b string) > using parquet PARTITIONED BY (b) ; > +---------+--+ > | Result | > +---------+--+ > +---------+--+ > 0: jdbc:hive2://10.1xx:23040/default> insert into t1110 values (2,'b'); > +---------+--+ > | Result | > +---------+--+ > +---------+--+ > 0: jdbc:hive2://10.1xx:23040/default> insert into t1110 values (1,'a'); > +---------+--+ > | Result | > +---------+--+ > +---------+--+ > 0: jdbc:hive2://10.xx.xx:23040/default> select * from t1110; > +----+----+--+ > | a | b | > +----+----+--+ > | 1 | a | > | 2 | b | > +----+----+--+ > {code} > *{color:#d04437}Cost of the query shows sizeInBytes in EB{color}* > {code:java} > explain cost select * from t1110; > | == Optimized Logical Plan == > Relation[a#23,b#24] parquet, Statistics(sizeInBytes=8.0 EB, hints=none) > == Physical Plan == > *(1) FileScan parquet open.t1110[a#23,b#24] Batched: true, Format: Parquet, > Location: > CatalogFileIndex[hdfs://hacluster/user/sparkhive/warehouse/open.db/t1110], > PartitionCount: 2, PartitionFilters: [], PushedFilters: [], ReadSchema: > struct<a:int> | > {code} > *{color:#d04437}This would lead to Sort Merge Join in case of join > query{color}* > {code:java} > 0: jdbc:hive2://10.xx.xx:23040/default> create table t110 (a int, b string) > using parquet PARTITIONED BY (b) ; > +---------+--+ > | Result | > +---------+--+ > +---------+--+ > 0: jdbc:hive2://10.xx.xx:23040/default> insert into t110 values (1,'a'); > +---------+--+ > | Result | > +---------+--+ > +---------+--+ > explain select * from t1110 t1 join t110 t2 on t1.a=t2.a; > | == Physical Plan == > *(5) SortMergeJoin [a#23], [a#55], Inner > :- *(2) Sort [a#23 ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(a#23, 200) > : +- *(1) Project [a#23, b#24] > : +- *(1) Filter isnotnull(a#23) > : +- *(1) FileScan parquet open.t1110[a#23,b#24] Batched: true, Format: > Parquet, Location: > CatalogFileIndex[hdfs://hacluster/user/sparkhive/warehouse/open.db/t1110], > PartitionCount: 2, PartitionFilters: [], PushedFilters: [IsNotNull(a)], > ReadSchema: struct<a:int> > +- *(4) Sort [a#55 ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(a#55, 200) > +- *(3) Project [a#55, b#56] > +- *(3) Filter isnotnull(a#55) > +- *(3) FileScan parquet open.t110[a#55,b#56] Batched: true, Format: Parquet, > Location: > CatalogFileIndex[hdfs://hacluster/user/sparkhive/warehouse/open.db/t110], > PartitionCount: 1, PartitionFilters: [], PushedFilters: [IsNotNull(a)], > ReadSchema: struct<a:int> | > {code} -- This message was sent by Atlassian JIRA (v7.6.14#76016) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org