Upgrade to parquet 1.6.0

2015-06-12 Thread Eric Eijkelenboom
Hi 

What is the reason that Spark still comes with Parquet 1.6.0rc3? It seems like 
newer Parquet versions are available (e.g. 1.6.0). This would fix problems with 
‘spark.sql.parquet.filterPushdown’, which currently is disabled by default, 
because of a bug in Parquet 1.6.0rc3.

Thanks! 

Eric

Re: Upgrade to parquet 1.6.0

2015-06-12 Thread Eric Eijkelenboom
Great, thanks for the extra info! 

 On 12 Jun 2015, at 12:41, Cheng Lian lian.cs@gmail.com wrote:
 
 At the time 1.3.x was released, 1.6.0 hasn't been released yet. And we didn't 
 have enough time to upgrade and test Parquet 1.6.0 for Spark 1.4.0. But we've 
 already upgraded Parquet to 1.7.0 (which is exactly the same as 1.6.0 with 
 package name renamed from com.twitter to org.apache.parquet) on master branch 
 recently.
 
 Cheng
 
 On 6/12/15 6:16 PM, Eric Eijkelenboom wrote:
 Hi 
 
 What is the reason that Spark still comes with Parquet 1.6.0rc3? It seems 
 like newer Parquet versions are available (e.g. 1.6.0). This would fix 
 problems with ‘spark.sql.parquet.filterPushdown’, which currently is 
 disabled by default, because of a bug in Parquet 1.6.0rc3.
 
 Thanks! 
 
 Eric
 



Re: Parquet number of partitions

2015-05-07 Thread Eric Eijkelenboom
Funny enough, I observe different behaviour on EC2 vs EMR (Spark on EMR 
installed with 
https://github.com/awslabs/emr-bootstrap-actions/tree/master/spark 
https://github.com/awslabs/emr-bootstrap-actions/tree/master/spark). Both 
with Spark 1.3.1/Hadoop 2.

Reading a folder with 12 Parquet gives me the following:

On EC2: 
scala val logs = sqlContext.parquetFile(“s3n://mylogs/”)
...
scala logs.rdd.partitions.length
15/05/07 11:45:50 INFO ParquetRelation2: Reading 100.0% of partitions
15/05/07 11:45:51 INFO MemoryStore: ensureFreeSpace(125716) called with 
curMem=0, maxMem=278302556
15/05/07 11:45:51 INFO MemoryStore: Block broadcast_0 stored as values in 
memory (estimated size 122.8 KB, free 265.3 MB)
15/05/07 11:45:51 INFO MemoryStore: ensureFreeSpace(19128) called with 
curMem=125716, maxMem=278302556
15/05/07 11:45:51 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in 
memory (estimated size 18.7 KB, free 265.3 MB)
15/05/07 11:45:51 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 
ip-10-31-82-233.ec2.internal:39894 (size: 18.7 KB, free: 265.4 MB)
15/05/07 11:45:51 INFO BlockManagerMaster: Updated info of block 
broadcast_0_piece0
15/05/07 11:45:51 INFO SparkContext: Created broadcast 0 from NewHadoopRDD at 
newParquet.scala:478
15/05/07 11:45:51 INFO ParquetRelation2$$anon$1$$anon$2: Using Task Side 
Metadata Split Strategy
res0: Int = 12

On EMR:
scala val logs = sqlContext.parquetFile(“s3n://mylogs/”)
...
scala logs.rdd.partitions.length
15/05/07 11:46:53 INFO parquet.ParquetRelation2: Reading 100.0% of partitions
15/05/07 11:46:53 INFO storage.MemoryStore: ensureFreeSpace(266059) called with 
curMem=287247, maxMem=6667936727
15/05/07 11:46:53 INFO storage.MemoryStore: Block broadcast_1 stored as values 
in memory (estimated size 259.8 KB, free 6.2 GB)
15/05/07 11:46:53 INFO storage.MemoryStore: ensureFreeSpace(21188) called with 
curMem=553306, maxMem=6667936727
15/05/07 11:46:53 INFO storage.MemoryStore: Block broadcast_1_piece0 stored as 
bytes in memory (estimated size 20.7 KB, free 6.2 GB)
15/05/07 11:46:53 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in 
memory on ip-10-203-174-61.ec2.internal:52570 (size: 20.7 KB, free: 6.2 GB)
15/05/07 11:46:53 INFO storage.BlockManagerMaster: Updated info of block 
broadcast_1_piece0
15/05/07 11:46:53 INFO spark.SparkContext: Created broadcast 1 from 
NewHadoopRDD at newParquet.scala:478
15/05/07 11:46:54 INFO parquet.ParquetRelation2$$anon$1$$anon$2: Using Task 
Side Metadata Split Strategy
res3: Int = 138

138 (!) partitions on EMR and 12 partitions on EC2 (same as number of files). 
I’m reading from the exact same folder on S3.

This leads me to believe that there might be some configuration settings which 
control how partitioning happens. Could that be the case?

Insights would be greatly appreciated. 

Best, Eric



 On 07 May 2015, at 09:31, Archit Thakur archit279tha...@gmail.com wrote:
 
 Hi.
 No. of partitions are determined by the RDD it uses in the plan it creates. 
 It uses NewHadoopRDD which gives partitions by getSplits of input format it 
 is using. It uses FilteringParquetRowInputFormat subclass of 
 ParquetInputFormat. To change the no of partitions write a new input format 
 and make the NewHadoopRDD use your plan. or if u r ready to shuffle u can use 
 repartition api without change of code.
 
 Thanks  Regards.
 
 On Tue, May 5, 2015 at 7:56 PM, Masf masfwo...@gmail.com 
 mailto:masfwo...@gmail.com wrote:
 Hi Eric.
 
 Q1:
 When I read parquet files, I've tested that Spark generates so many 
 partitions as parquet files exist in the path.
 
 Q2:
 To reduce the number of partitions you can use rdd.repartition(x), x= number 
 of partitions. Depend on your case, repartition could be a heavy task
 
 
 Regards.
 Miguel.
 
 On Tue, May 5, 2015 at 3:56 PM, Eric Eijkelenboom 
 eric.eijkelenb...@gmail.com mailto:eric.eijkelenb...@gmail.com wrote:
 Hello guys
 
 Q1: How does Spark determine the number of partitions when reading a Parquet 
 file?
 
 val df = sqlContext.parquetFile(path)
 
 Is it some way related to the number of Parquet row groups in my input?
 
 Q2: How can I reduce this number of partitions? Doing this:
 
 df.rdd.coalesce(200).count
 
 from the spark-shell causes job execution to hang…
 
 Any ideas? Thank you in advance.
 
 Eric
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
 mailto:user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org 
 mailto:user-h...@spark.apache.org
 
 
 
 
 -- 
 
 
 Saludos.
 Miguel Ángel
 



Parquet number of partitions

2015-05-05 Thread Eric Eijkelenboom
Hello guys

Q1: How does Spark determine the number of partitions when reading a Parquet 
file?

val df = sqlContext.parquetFile(path)

Is it some way related to the number of Parquet row groups in my input?

Q2: How can I reduce this number of partitions? Doing this:

df.rdd.coalesce(200).count

from the spark-shell causes job execution to hang… 

Any ideas? Thank you in advance. 

Eric
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Opening many Parquet files = slow

2015-04-13 Thread Eric Eijkelenboom
Hi guys

Does anyone know how to stop Spark from opening all Parquet files before 
starting a job? This is quite a show stopper for me, since I have 5000 Parquet 
files on S3.

Recap of what I tried: 

1. Disable schema merging with: sqlContext.load(“parquet, Map(mergeSchema - 
false”, path - “s3://path/to/folder))
This opens most files in the folder (17 out of 21 in my small example). For 
5000 files on S3, sqlContext.load() takes 30 minutes to complete. 

2. Use the old api with: 
sqlContext.setConf(spark.sql.parquet.useDataSourceApi, false”)
Now sqlContext.parquetFile() only opens a few files and prints the schema: 
so far so good! However, as soon as I run e.g. a count() on the dataframe, 
Spark still opens all files _before_ starting a job/stage. Effectively this 
moves the delay from load() to count() (or any other action I presume).

3. Run Spark 1.3.1-rc2.
sqlContext.load() took about 30 minutes for 5000 Parquet files on S3, the 
same as 1.3.0.

Any help would be greatly appreciated!

Thanks a lot. 

Eric




 On 10 Apr 2015, at 16:46, Eric Eijkelenboom eric.eijkelenb...@gmail.com 
 wrote:
 
 Hi Ted
 
 Ah, I guess the term ‘source’ confused me :)
 
 Doing:
 
 sqlContext.load(“parquet, Map(mergeSchema - false”, path - “path to a 
 single day of logs)) 
 
 for 1 directory with 21 files, Spark opens 17 files: 
 
 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening 
 's3n://mylogs/logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-72'
  
 s3n://mylogs/logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-72'
  for reading
 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening key 
 'logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-72' for 
 reading at position '261573524'
 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening 
 's3n://mylogs/logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-74'
  
 s3n://mylogs/logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-74'
  for reading
 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening 
 's3n://mylogs/logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-77'
  
 s3n://mylogs/logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-77'
  for reading
 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening 
 's3n://mylogs/logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-62'
  
 s3n://mylogs/logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-62'
  for reading
 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening key 
 'logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-74' for 
 reading at position '259256807'
 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening key 
 'logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-77' for 
 reading at position '260002042'
 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening key 
 'logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-62' for 
 reading at position ‘260875275'
 etc.
 
 I can’t seem to pass a comma-separated list of directories to load(), so in 
 order to load multiple days of logs, I have to point to the root folder and 
 depend on auto-partition discovery (unless there’s a smarter way). 
 
 Doing: 
 
 sqlContext.load(“parquet, Map(mergeSchema - false”, path - “path to 
 root log dir)) 
 
 starts opening what seems like all files (I killed the process after a couple 
 of minutes).
 
 Thanks for helping out. 
 Eric



Opening many Parquet files = slow

2015-04-08 Thread Eric Eijkelenboom
Hi guys

I’ve got:
180 days of log data in Parquet.
Each day is stored in a separate folder in S3.
Each day consists of 20-30 Parquet files of 256 MB each.
Spark 1.3 on Amazon EMR
This makes approximately 5000 Parquet files with a total size if 1.5 TB.

My code: 
val in = sqlContext.parquetFile(“day1”, “day2”, …, “day180”)

Problem: 
Before the very first stage is started, Spark spends about 25 minutes printing 
the following:
...
15/04/08 13:00:26 INFO s3native.NativeS3FileSystem: Opening key 
'logs/=2014/mm=8/dd=30/1b1d213d-101d-45a7-85ff-f1c573bf5ffd-59' for 
reading at position '258305902'
15/04/08 13:00:26 INFO s3native.NativeS3FileSystem: Opening key 
'logs/=2014/mm=11/dd=22/2dbc904b-b341-4e9f-a9bb-5b5933c2e101-72' for 
reading at position '260897108'
15/04/08 13:00:26 INFO s3native.NativeS3FileSystem: Opening 
's3n://adt-timelord-daily-logs-pure/logs/=2014/mm=11/dd=2/b0752022-3cd0-47e5-9e67-6ad84543e84b-000124'
 for reading
15/04/08 13:00:27 INFO s3native.NativeS3FileSystem: Opening key 
'logs/=2014/mm=11/dd=2/b0752022-3cd0-47e5-9e67-6ad84543e84b-000124' for 
reading at position '261259189'
15/04/08 13:00:27 INFO s3native.NativeS3FileSystem: Opening 
's3n://adt-timelord-daily-logs-pure/logs/=2014/mm=10/dd=15/bc9c8fdf-dc67-441a-8eda-9a06f032158f-000102'
 for reading
15/04/08 13:00:27 INFO s3native.NativeS3FileSystem: Opening 
's3n://adt-timelord-daily-logs-pure/logs/=2014/mm=8/dd=30/1b1d213d-101d-45a7-85ff-f1c573bf5ffd-60'
 for reading
15/04/08 13:00:27 INFO s3native.NativeS3FileSystem: Opening 
's3n://adt-timelord-daily-logs-pure/logs/=2014/mm=11/dd=22/2dbc904b-b341-4e9f-a9bb-5b5933c2e101-73'
 for reading
… etc

It looks like Spark is opening each file, before it actually does any work. 
This means a delay of 25 minutes when working with Parquet files. Previously, 
we used LZO files and did not experience this problem.

Bonus info: 
This also happens when I use auto partition discovery (i.e. 
sqlContext.parquetFile(“/path/to/logsroot/)).

What can I do to avoid this? 

Thanks in advance! 

Eric Eijkelenboom