[ https://issues.apache.org/jira/browse/SPARK-17593?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Gaurav Shah updated SPARK-17593: -------------------------------- Description: lets say we have following partitioned data: {code} events_v3 -- event_date=2015-01-01 ---- event_hour=0 ------ verb=follow --------part10000.parquet.gz ---- event_hour=1 ------ verb=click --------part10000.parquet.gz -- event_date=2015-01-02 ---- event_hour=5 ------ verb=follow --------part10000.parquet.gz ---- event_hour=10 ------ verb=click --------part10000.parquet.gz {code} To read (or write ) parquet partitioned data via spark it makes call to `ListingFileCatalog.listLeafFiles` . Which recursively tries to list all files and folders. In this case if we had 300 dates, we would have created 300 jobs each trying to get filelist from date_directory. This process takes about 10 minutes to finish ( with 2 executors). vs if I use a ruby script to get list of all files recursively in the same folder it takes about 1 minute, on the same machine with just 1 thread. I am confused as to why it would take so much time extra for listing files. spark code: {code:scala} val sparkSession = org.apache.spark.sql.SparkSession.builder .config("spark.sql.hive.metastorePartitionPruning",true) .config("spark.sql.parquet.filterPushdown", true) .config("spark.sql.hive.verifyPartitionPath", false) .config("spark.sql.hive.convertMetastoreParquet.mergeSchema",false) .config("parquet.enable.summary-metadata",false) .config("spark.sql.sources.partitionDiscovery.enabled",false) .getOrCreate() val df = sparkSession.read.option("mergeSchema","false").format("parquet").load("s3n://bucket_name/events_v3") df.createOrReplaceTempView("temp_events") sparkSession.sql( """ |select verb,count(*) from temp_events where event_date = "2016-08-05" group by verb """.stripMargin).show() {code} ruby code: {code:ruby} gem 'aws-sdk', '~> 2' require 'aws-sdk' client = Aws::S3::Client.new(:region=>'us-west-1') next_continuation_token = nil total = 0 loop do a= client.list_objects_v2({ bucket: "bucket", # required max_keys: 1000, prefix: "events_v3/", continuation_token: next_continuation_token , fetch_owner: false, }) puts a.contents.last.key total += a.contents.size next_continuation_token = a.next_continuation_token break unless a.is_truncated end puts "total" puts total {code} tried looking into following bug: https://issues.apache.org/jira/browse/HADOOP-12810 but hadoop 2.7.3 doesn't solve that problem stackoverflow reference: http://stackoverflow.com/questions/39525288/spark-parquet-write-gets-slow-as-partitions-grow was: lets say we have following partitioned data: {code} events_v3 -- event_date=2015-01-01 -- event_hour=0 -- part10000.parquet.gz -- event_date=2015-01-02 -- event_hour=5 -- part10000.parquet.gz {code} To read (or write ) parquet partitioned data via spark it makes call to `ListingFileCatalog.listLeafFiles` . Which recursively tries to list all files and folders. In this case if we had 300 dates, we would have created 300 jobs each trying to get filelist from date_directory. This process takes about 10 minutes to finish ( with 2 executors). vs if I use a ruby script to get list of all files recursively in the same folder it takes about 1 minute, on the same machine with just 1 thread. I am confused as to why it would take so much time extra for listing files. spark code: {code:scala} val sparkSession = org.apache.spark.sql.SparkSession.builder .config("spark.sql.hive.metastorePartitionPruning",true) .config("spark.sql.parquet.filterPushdown", true) .config("spark.sql.hive.verifyPartitionPath", false) .config("spark.sql.hive.convertMetastoreParquet.mergeSchema",false) .config("parquet.enable.summary-metadata",false) .config("spark.sql.sources.partitionDiscovery.enabled",false) .getOrCreate() val df = sparkSession.read.option("mergeSchema","false").format("parquet").load("s3n://bucket_name/events_v3") df.createOrReplaceTempView("temp_events") sparkSession.sql( """ |select verb,count(*) from temp_events where event_date = "2016-08-05" group by verb """.stripMargin).show() {code} ruby code: {code:ruby} gem 'aws-sdk', '~> 2' require 'aws-sdk' client = Aws::S3::Client.new(:region=>'us-west-1') next_continuation_token = nil total = 0 loop do a= client.list_objects_v2({ bucket: "bucket", # required max_keys: 1000, prefix: "events_v3/", continuation_token: next_continuation_token , fetch_owner: false, }) puts a.contents.last.key total += a.contents.size next_continuation_token = a.next_continuation_token break unless a.is_truncated end puts "total" puts total {code} tried looking into following bug: https://issues.apache.org/jira/browse/HADOOP-12810 but hadoop 2.7.3 doesn't solve that problem stackoverflow reference: http://stackoverflow.com/questions/39525288/spark-parquet-write-gets-slow-as-partitions-grow > list files on s3 very slow > -------------------------- > > Key: SPARK-17593 > URL: https://issues.apache.org/jira/browse/SPARK-17593 > Project: Spark > Issue Type: Bug > Affects Versions: 2.0.0 > Environment: spark 2.0.0, hadoop 2.7.2 ( hadoop 2.7.3) > Reporter: Gaurav Shah > > lets say we have following partitioned data: > {code} > events_v3 > -- event_date=2015-01-01 > ---- event_hour=0 > ------ verb=follow > --------part10000.parquet.gz > ---- event_hour=1 > ------ verb=click > --------part10000.parquet.gz > -- event_date=2015-01-02 > ---- event_hour=5 > ------ verb=follow > --------part10000.parquet.gz > ---- event_hour=10 > ------ verb=click > --------part10000.parquet.gz > {code} > To read (or write ) parquet partitioned data via spark it makes call to > `ListingFileCatalog.listLeafFiles` . Which recursively tries to list all > files and folders. > In this case if we had 300 dates, we would have created 300 jobs each trying > to get filelist from date_directory. This process takes about 10 minutes to > finish ( with 2 executors). vs if I use a ruby script to get list of all > files recursively in the same folder it takes about 1 minute, on the same > machine with just 1 thread. > I am confused as to why it would take so much time extra for listing files. > spark code: > {code:scala} > val sparkSession = org.apache.spark.sql.SparkSession.builder > .config("spark.sql.hive.metastorePartitionPruning",true) > .config("spark.sql.parquet.filterPushdown", true) > .config("spark.sql.hive.verifyPartitionPath", false) > .config("spark.sql.hive.convertMetastoreParquet.mergeSchema",false) > .config("parquet.enable.summary-metadata",false) > .config("spark.sql.sources.partitionDiscovery.enabled",false) > .getOrCreate() > val df = > sparkSession.read.option("mergeSchema","false").format("parquet").load("s3n://bucket_name/events_v3") > df.createOrReplaceTempView("temp_events") > sparkSession.sql( > """ > |select verb,count(*) from temp_events where event_date = > "2016-08-05" group by verb > """.stripMargin).show() > {code} > ruby code: > {code:ruby} > gem 'aws-sdk', '~> 2' > require 'aws-sdk' > client = Aws::S3::Client.new(:region=>'us-west-1') > next_continuation_token = nil > total = 0 > loop do > a= client.list_objects_v2({ > bucket: "bucket", # required > max_keys: 1000, > prefix: "events_v3/", > continuation_token: next_continuation_token , > fetch_owner: false, > }) > puts a.contents.last.key > total += a.contents.size > next_continuation_token = a.next_continuation_token > break unless a.is_truncated > end > puts "total" > puts total > {code} > tried looking into following bug: > https://issues.apache.org/jira/browse/HADOOP-12810 > but hadoop 2.7.3 doesn't solve that problem > stackoverflow reference: > http://stackoverflow.com/questions/39525288/spark-parquet-write-gets-slow-as-partitions-grow -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org