[
https://issues.apache.org/jira/browse/SPARK-8597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14602009#comment-14602009
]
Michael Armbrust commented on SPARK-8597:
-----------------------------------------
Parquet allocates fairly large buffers for each open file, so this is expected.
Doing something smarter here is pretty high on my 1.5 priority list though.
Some possible options:
- Use parquets memory management to make sure it scales down the buffer size
when needed
- Limit the number of open files
- (my favorite) Order data before writing out partitioning so that we only
need to have one file open at a time.
The last one is more expensive at write time, but would result in the
fewest/largest files which is optimal for reading.
> DataFrame partitionBy memory pressure scales extremely poorly
> -------------------------------------------------------------
>
> Key: SPARK-8597
> URL: https://issues.apache.org/jira/browse/SPARK-8597
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 1.4.0
> Reporter: Matt Cheah
> Priority: Blocker
> Attachments: table.csv
>
>
> I'm running into a strange memory scaling issue when using the partitionBy
> feature of DataFrameWriter.
> I've generated a table (a CSV file) with 3 columns (A, B and C) and 32*32
> different entries, with size on disk of about 20kb. There are 32 distinct
> values for column A and 32 distinct values for column B and all these are
> combined together (column C will contain a random number for each row - it
> doesn't matter) producing a 32*32 elements data set. I've imported this into
> Spark and I ran a partitionBy("A", "B") in order to test its performance.
> This should create a nested directory structure with 32 folders, each of them
> containing another 32 folders. It uses about 10Gb of RAM and it's running
> slow. If I increase the number of entries in the table from 32*32 to 128*128,
> I get Java Heap Space Out Of Memory no matter what value I use for Heap Space
> variable.
> Scala code:
> {code}
> var df = sqlContext.read.format("com.databricks.spark.csv").option("header",
> "true").load("table.csv")
> df.write.partitionBy("A", "B").mode("overwrite").parquet("table.parquet”)
> {code}
> How I ran the Spark shell:
> {code}
> bin/spark-shell --driver-memory 16g --master local[8] --packages
> com.databricks:spark-csv_2.10:1.0.3
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]