Re: Opening many Parquet files = slow
Hi guys Regarding to parquet files. I have Spark 1.2.0 and reading 27 parquet files (250MB/file), it lasts 4 minutes. I have a cluster with 4 nodes and it seems me too slow. The load function is not available in Spark 1.2, so I can't test it Regards. Miguel. On Mon, Apr 13, 2015 at 8:12 PM, Eric Eijkelenboom eric.eijkelenb...@gmail.com wrote: Hi guys Does anyone know how to stop Spark from opening all Parquet files before starting a job? This is quite a show stopper for me, since I have 5000 Parquet files on S3. Recap of what I tried: 1. Disable schema merging with: sqlContext.load(“parquet, Map(mergeSchema - false”, path - “s3://path/to/folder)) This opens most files in the folder (17 out of 21 in my small example). For 5000 files on S3, sqlContext.load() takes 30 minutes to complete. 2. Use the old api with: sqlContext.setConf(spark.sql.parquet.useDataSourceApi, false”) Now sqlContext.parquetFile() only opens a few files and prints the schema: so far so good! However, as soon as I run e.g. a count() on the dataframe, Spark still opens all files _before_ starting a job/stage. Effectively this moves the delay from load() to count() (or any other action I presume). 3. Run Spark 1.3.1-rc2. sqlContext.load() took about 30 minutes for 5000 Parquet files on S3, the same as 1.3.0. Any help would be greatly appreciated! Thanks a lot. Eric On 10 Apr 2015, at 16:46, Eric Eijkelenboom eric.eijkelenb...@gmail.com wrote: Hi Ted Ah, I guess the term ‘source’ confused me :) Doing: sqlContext.load(“parquet, Map(mergeSchema - false”, path - “path to a single day of logs)) for 1 directory with 21 files, Spark opens 17 files: 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening ' s3n://mylogs/logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-72' for reading 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening key 'logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-72' for reading at position '261573524' 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening ' s3n://mylogs/logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-74' for reading 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening ' s3n://mylogs/logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-77' for reading 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening ' s3n://mylogs/logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-62' for reading 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening key 'logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-74' for reading at position '259256807' 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening key 'logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-77' for reading at position '260002042' 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening key 'logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-62' for reading at position ‘260875275' etc. I can’t seem to pass a comma-separated list of directories to load(), so in order to load multiple days of logs, I have to point to the root folder and depend on auto-partition discovery (unless there’s a smarter way). Doing: sqlContext.load(“parquet, Map(mergeSchema - false”, path - “path to root log dir)) starts opening what seems like all files (I killed the process after a couple of minutes). Thanks for helping out. Eric -- Saludos. Miguel Ángel
Re: Opening many Parquet files = slow
Hi guys Does anyone know how to stop Spark from opening all Parquet files before starting a job? This is quite a show stopper for me, since I have 5000 Parquet files on S3. Recap of what I tried: 1. Disable schema merging with: sqlContext.load(“parquet, Map(mergeSchema - false”, path - “s3://path/to/folder)) This opens most files in the folder (17 out of 21 in my small example). For 5000 files on S3, sqlContext.load() takes 30 minutes to complete. 2. Use the old api with: sqlContext.setConf(spark.sql.parquet.useDataSourceApi, false”) Now sqlContext.parquetFile() only opens a few files and prints the schema: so far so good! However, as soon as I run e.g. a count() on the dataframe, Spark still opens all files _before_ starting a job/stage. Effectively this moves the delay from load() to count() (or any other action I presume). 3. Run Spark 1.3.1-rc2. sqlContext.load() took about 30 minutes for 5000 Parquet files on S3, the same as 1.3.0. Any help would be greatly appreciated! Thanks a lot. Eric On 10 Apr 2015, at 16:46, Eric Eijkelenboom eric.eijkelenb...@gmail.com wrote: Hi Ted Ah, I guess the term ‘source’ confused me :) Doing: sqlContext.load(“parquet, Map(mergeSchema - false”, path - “path to a single day of logs)) for 1 directory with 21 files, Spark opens 17 files: 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening 's3n://mylogs/logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-72' s3n://mylogs/logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-72' for reading 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening key 'logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-72' for reading at position '261573524' 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening 's3n://mylogs/logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-74' s3n://mylogs/logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-74' for reading 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening 's3n://mylogs/logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-77' s3n://mylogs/logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-77' for reading 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening 's3n://mylogs/logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-62' s3n://mylogs/logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-62' for reading 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening key 'logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-74' for reading at position '259256807' 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening key 'logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-77' for reading at position '260002042' 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening key 'logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-62' for reading at position ‘260875275' etc. I can’t seem to pass a comma-separated list of directories to load(), so in order to load multiple days of logs, I have to point to the root folder and depend on auto-partition discovery (unless there’s a smarter way). Doing: sqlContext.load(“parquet, Map(mergeSchema - false”, path - “path to root log dir)) starts opening what seems like all files (I killed the process after a couple of minutes). Thanks for helping out. Eric
Re: Opening many Parquet files = slow
You may have seen this thread: http://search-hadoop.com/m/JW1q5SlRpt1 Cheers On Wed, Apr 8, 2015 at 6:15 AM, Eric Eijkelenboom eric.eijkelenb...@gmail.com wrote: Hi guys *I’ve got:* - 180 days of log data in Parquet. - Each day is stored in a separate folder in S3. - Each day consists of 20-30 Parquet files of 256 MB each. - Spark 1.3 on Amazon EMR This makes approximately 5000 Parquet files with a total size if 1.5 TB. *My code*: val in = sqlContext.parquetFile(“day1”, “day2”, …, “day180”) *Problem*: Before the very first stage is started, Spark spends about 25 minutes printing the following: ... 15/04/08 13:00:26 INFO s3native.NativeS3FileSystem: Opening key 'logs/=2014/mm=8/dd=30/1b1d213d-101d-45a7-85ff-f1c573bf5ffd-59' for reading at position '258305902' 15/04/08 13:00:26 INFO s3native.NativeS3FileSystem: Opening key 'logs/=2014/mm=11/dd=22/2dbc904b-b341-4e9f-a9bb-5b5933c2e101-72' for reading at position '260897108' 15/04/08 13:00:26 INFO s3native.NativeS3FileSystem: Opening ' s3n://adt-timelord-daily-logs-pure/logs/=2014/mm=11/dd=2/b0752022-3cd0-47e5-9e67-6ad84543e84b-000124' for reading 15/04/08 13:00:27 INFO s3native.NativeS3FileSystem: Opening key 'logs/=2014/mm=11/dd=2/b0752022-3cd0-47e5-9e67-6ad84543e84b-000124' for reading at position '261259189' 15/04/08 13:00:27 INFO s3native.NativeS3FileSystem: Opening ' s3n://adt-timelord-daily-logs-pure/logs/=2014/mm=10/dd=15/bc9c8fdf-dc67-441a-8eda-9a06f032158f-000102' for reading 15/04/08 13:00:27 INFO s3native.NativeS3FileSystem: Opening ' s3n://adt-timelord-daily-logs-pure/logs/=2014/mm=8/dd=30/1b1d213d-101d-45a7-85ff-f1c573bf5ffd-60' for reading 15/04/08 13:00:27 INFO s3native.NativeS3FileSystem: Opening ' s3n://adt-timelord-daily-logs-pure/logs/=2014/mm=11/dd=22/2dbc904b-b341-4e9f-a9bb-5b5933c2e101-73' for reading … etc It looks like Spark is opening each file, before it actually does any work. This means a delay of 25 minutes when working with Parquet files. Previously, we used LZO files and did not experience this problem. *Bonus info: * This also happens when I use auto partition discovery (i.e. sqlContext.parquetFile(“/path/to/logsroot/)). What can I do to avoid this? Thanks in advance! Eric Eijkelenboom
Opening many Parquet files = slow
Hi guys I’ve got: 180 days of log data in Parquet. Each day is stored in a separate folder in S3. Each day consists of 20-30 Parquet files of 256 MB each. Spark 1.3 on Amazon EMR This makes approximately 5000 Parquet files with a total size if 1.5 TB. My code: val in = sqlContext.parquetFile(“day1”, “day2”, …, “day180”) Problem: Before the very first stage is started, Spark spends about 25 minutes printing the following: ... 15/04/08 13:00:26 INFO s3native.NativeS3FileSystem: Opening key 'logs/=2014/mm=8/dd=30/1b1d213d-101d-45a7-85ff-f1c573bf5ffd-59' for reading at position '258305902' 15/04/08 13:00:26 INFO s3native.NativeS3FileSystem: Opening key 'logs/=2014/mm=11/dd=22/2dbc904b-b341-4e9f-a9bb-5b5933c2e101-72' for reading at position '260897108' 15/04/08 13:00:26 INFO s3native.NativeS3FileSystem: Opening 's3n://adt-timelord-daily-logs-pure/logs/=2014/mm=11/dd=2/b0752022-3cd0-47e5-9e67-6ad84543e84b-000124' for reading 15/04/08 13:00:27 INFO s3native.NativeS3FileSystem: Opening key 'logs/=2014/mm=11/dd=2/b0752022-3cd0-47e5-9e67-6ad84543e84b-000124' for reading at position '261259189' 15/04/08 13:00:27 INFO s3native.NativeS3FileSystem: Opening 's3n://adt-timelord-daily-logs-pure/logs/=2014/mm=10/dd=15/bc9c8fdf-dc67-441a-8eda-9a06f032158f-000102' for reading 15/04/08 13:00:27 INFO s3native.NativeS3FileSystem: Opening 's3n://adt-timelord-daily-logs-pure/logs/=2014/mm=8/dd=30/1b1d213d-101d-45a7-85ff-f1c573bf5ffd-60' for reading 15/04/08 13:00:27 INFO s3native.NativeS3FileSystem: Opening 's3n://adt-timelord-daily-logs-pure/logs/=2014/mm=11/dd=22/2dbc904b-b341-4e9f-a9bb-5b5933c2e101-73' for reading … etc It looks like Spark is opening each file, before it actually does any work. This means a delay of 25 minutes when working with Parquet files. Previously, we used LZO files and did not experience this problem. Bonus info: This also happens when I use auto partition discovery (i.e. sqlContext.parquetFile(“/path/to/logsroot/)). What can I do to avoid this? Thanks in advance! Eric Eijkelenboom
Re: Opening many Parquet files = slow
Thanks for the report. We improved the speed here in 1.3.1 so would be interesting to know if this helps. You should also try disabling schema merging if you do not need that feature (i.e. all of your files are the same schema). sqlContext.load(path, parquet, Map(mergeSchema - false)) On Wed, Apr 8, 2015 at 7:35 AM, Ted Yu yuzhih...@gmail.com wrote: You may have seen this thread: http://search-hadoop.com/m/JW1q5SlRpt1 Cheers On Wed, Apr 8, 2015 at 6:15 AM, Eric Eijkelenboom eric.eijkelenb...@gmail.com wrote: Hi guys *I’ve got:* - 180 days of log data in Parquet. - Each day is stored in a separate folder in S3. - Each day consists of 20-30 Parquet files of 256 MB each. - Spark 1.3 on Amazon EMR This makes approximately 5000 Parquet files with a total size if 1.5 TB. *My code*: val in = sqlContext.parquetFile(“day1”, “day2”, …, “day180”) *Problem*: Before the very first stage is started, Spark spends about 25 minutes printing the following: ... 15/04/08 13:00:26 INFO s3native.NativeS3FileSystem: Opening key 'logs/=2014/mm=8/dd=30/1b1d213d-101d-45a7-85ff-f1c573bf5ffd-59' for reading at position '258305902' 15/04/08 13:00:26 INFO s3native.NativeS3FileSystem: Opening key 'logs/=2014/mm=11/dd=22/2dbc904b-b341-4e9f-a9bb-5b5933c2e101-72' for reading at position '260897108' 15/04/08 13:00:26 INFO s3native.NativeS3FileSystem: Opening ' s3n://adt-timelord-daily-logs-pure/logs/=2014/mm=11/dd=2/b0752022-3cd0-47e5-9e67-6ad84543e84b-000124' for reading 15/04/08 13:00:27 INFO s3native.NativeS3FileSystem: Opening key 'logs/=2014/mm=11/dd=2/b0752022-3cd0-47e5-9e67-6ad84543e84b-000124' for reading at position '261259189' 15/04/08 13:00:27 INFO s3native.NativeS3FileSystem: Opening ' s3n://adt-timelord-daily-logs-pure/logs/=2014/mm=10/dd=15/bc9c8fdf-dc67-441a-8eda-9a06f032158f-000102' for reading 15/04/08 13:00:27 INFO s3native.NativeS3FileSystem: Opening ' s3n://adt-timelord-daily-logs-pure/logs/=2014/mm=8/dd=30/1b1d213d-101d-45a7-85ff-f1c573bf5ffd-60' for reading 15/04/08 13:00:27 INFO s3native.NativeS3FileSystem: Opening ' s3n://adt-timelord-daily-logs-pure/logs/=2014/mm=11/dd=22/2dbc904b-b341-4e9f-a9bb-5b5933c2e101-73' for reading … etc It looks like Spark is opening each file, before it actually does any work. This means a delay of 25 minutes when working with Parquet files. Previously, we used LZO files and did not experience this problem. *Bonus info: * This also happens when I use auto partition discovery (i.e. sqlContext.parquetFile(“/path/to/logsroot/)). What can I do to avoid this? Thanks in advance! Eric Eijkelenboom
Re: Opening many Parquet files = slow
Hi Eric - Would you mind to try either disabling schema merging as what Michael suggested, or disabling the new Parquet data source by sqlContext.setConf(spark.sql.parquet.useDataSourceApi, false) Cheng On 4/9/15 2:43 AM, Michael Armbrust wrote: Thanks for the report. We improved the speed here in 1.3.1 so would be interesting to know if this helps. You should also try disabling schema merging if you do not need that feature (i.e. all of your files are the same schema). sqlContext.load(path, parquet, Map(mergeSchema - false)) On Wed, Apr 8, 2015 at 7:35 AM, Ted Yu yuzhih...@gmail.com mailto:yuzhih...@gmail.com wrote: You may have seen this thread: http://search-hadoop.com/m/JW1q5SlRpt1 Cheers On Wed, Apr 8, 2015 at 6:15 AM, Eric Eijkelenboom eric.eijkelenb...@gmail.com mailto:eric.eijkelenb...@gmail.com wrote: Hi guys *I’ve got:* * 180 days of log data in Parquet. * Each day is stored in a separate folder in S3. * Each day consists of 20-30 Parquet files of 256 MB each. * Spark 1.3 on Amazon EMR This makes approximately 5000 Parquet files with a total size if 1.5 TB. *My code*: val in = sqlContext.parquetFile(“day1”, “day2”, …, “day180”) *Problem*: Before the very first stage is started, Spark spends about 25 minutes printing the following: ... 15/04/08 13:00:26 INFO s3native.NativeS3FileSystem: Opening key 'logs/=2014/mm=8/dd=30/1b1d213d-101d-45a7-85ff-f1c573bf5ffd-59' for reading at position '258305902' 15/04/08 13:00:26 INFO s3native.NativeS3FileSystem: Opening key 'logs/=2014/mm=11/dd=22/2dbc904b-b341-4e9f-a9bb-5b5933c2e101-72' for reading at position '260897108' 15/04/08 13:00:26 INFO s3native.NativeS3FileSystem: Opening 's3n://adt-timelord-daily-logs-pure/logs/=2014/mm=11/dd=2/b0752022-3cd0-47e5-9e67-6ad84543e84b-000124' for reading 15/04/08 13:00:27 INFO s3native.NativeS3FileSystem: Opening key 'logs/=2014/mm=11/dd=2/b0752022-3cd0-47e5-9e67-6ad84543e84b-000124' for reading at position '261259189' 15/04/08 13:00:27 INFO s3native.NativeS3FileSystem: Opening 's3n://adt-timelord-daily-logs-pure/logs/=2014/mm=10/dd=15/bc9c8fdf-dc67-441a-8eda-9a06f032158f-000102' for reading 15/04/08 13:00:27 INFO s3native.NativeS3FileSystem: Opening 's3n://adt-timelord-daily-logs-pure/logs/=2014/mm=8/dd=30/1b1d213d-101d-45a7-85ff-f1c573bf5ffd-60' for reading 15/04/08 13:00:27 INFO s3native.NativeS3FileSystem: Opening 's3n://adt-timelord-daily-logs-pure/logs/=2014/mm=11/dd=22/2dbc904b-b341-4e9f-a9bb-5b5933c2e101-73' for reading … etc It looks like Spark is opening each file, before it actually does any work. This means a delay of 25 minutes when working with Parquet files. Previously, we used LZO files and did not experience this problem. *Bonus info: * This also happens when I use auto partition discovery (i.e. sqlContext.parquetFile(“/path/to/logsroot/)). What can I do to avoid this? Thanks in advance! Eric Eijkelenboom
Re: Opening many Parquet files = slow
We noticed similar perf degradation using Parquet (outside of Spark) and it happened due to merging of multiple schemas. Would be good to know if disabling merge of schema (if the schema is same) as Michael suggested helps in your case. On Wed, Apr 8, 2015 at 11:43 AM, Michael Armbrust mich...@databricks.com wrote: Thanks for the report. We improved the speed here in 1.3.1 so would be interesting to know if this helps. You should also try disabling schema merging if you do not need that feature (i.e. all of your files are the same schema). sqlContext.load(path, parquet, Map(mergeSchema - false)) On Wed, Apr 8, 2015 at 7:35 AM, Ted Yu yuzhih...@gmail.com wrote: You may have seen this thread: http://search-hadoop.com/m/JW1q5SlRpt1 Cheers On Wed, Apr 8, 2015 at 6:15 AM, Eric Eijkelenboom eric.eijkelenb...@gmail.com wrote: Hi guys *I’ve got:* - 180 days of log data in Parquet. - Each day is stored in a separate folder in S3. - Each day consists of 20-30 Parquet files of 256 MB each. - Spark 1.3 on Amazon EMR This makes approximately 5000 Parquet files with a total size if 1.5 TB. *My code*: val in = sqlContext.parquetFile(“day1”, “day2”, …, “day180”) *Problem*: Before the very first stage is started, Spark spends about 25 minutes printing the following: ... 15/04/08 13:00:26 INFO s3native.NativeS3FileSystem: Opening key 'logs/=2014/mm=8/dd=30/1b1d213d-101d-45a7-85ff-f1c573bf5ffd-59' for reading at position '258305902' 15/04/08 13:00:26 INFO s3native.NativeS3FileSystem: Opening key 'logs/=2014/mm=11/dd=22/2dbc904b-b341-4e9f-a9bb-5b5933c2e101-72' for reading at position '260897108' 15/04/08 13:00:26 INFO s3native.NativeS3FileSystem: Opening ' s3n://adt-timelord-daily-logs-pure/logs/=2014/mm=11/dd=2/b0752022-3cd0-47e5-9e67-6ad84543e84b-000124' for reading 15/04/08 13:00:27 INFO s3native.NativeS3FileSystem: Opening key 'logs/=2014/mm=11/dd=2/b0752022-3cd0-47e5-9e67-6ad84543e84b-000124' for reading at position '261259189' 15/04/08 13:00:27 INFO s3native.NativeS3FileSystem: Opening ' s3n://adt-timelord-daily-logs-pure/logs/=2014/mm=10/dd=15/bc9c8fdf-dc67-441a-8eda-9a06f032158f-000102' for reading 15/04/08 13:00:27 INFO s3native.NativeS3FileSystem: Opening ' s3n://adt-timelord-daily-logs-pure/logs/=2014/mm=8/dd=30/1b1d213d-101d-45a7-85ff-f1c573bf5ffd-60' for reading 15/04/08 13:00:27 INFO s3native.NativeS3FileSystem: Opening ' s3n://adt-timelord-daily-logs-pure/logs/=2014/mm=11/dd=22/2dbc904b-b341-4e9f-a9bb-5b5933c2e101-73' for reading … etc It looks like Spark is opening each file, before it actually does any work. This means a delay of 25 minutes when working with Parquet files. Previously, we used LZO files and did not experience this problem. *Bonus info: * This also happens when I use auto partition discovery (i.e. sqlContext.parquetFile(“/path/to/logsroot/)). What can I do to avoid this? Thanks in advance! Eric Eijkelenboom