[
https://issues.apache.org/jira/browse/SPARK-15419?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Joseph K. Bradley updated SPARK-15419:
--------------------------------------
Description:
When monotonicallyIncreasingId is used on a DataFrame with many partitions, it
uses a very large amount of memory.
Consider this code:
{code}
import org.apache.spark.sql.functions._
// JMAP1: run jmap -histo:live [PID]
val numPartitions = 1000
val df = spark.range(0, 1000000, 1, numPartitions).toDF("vtx")
df.cache().count()
// JMAP2: run jmap -histo:live [PID]
val df2 = df.withColumn("id", monotonicallyIncreasingId())
df2.cache().count()
// JMAP3: run jmap -histo:live [PID]
{code}
Here's how memory usage progresses:
* JMAP1: This is just for calibration.
* JMAP2: No significant change from 1.
* JMAP3: Massive jump: 3048895 Longs, 1039638 Objects, 2007427 Integers,
1002000 org.apache.spark.sql.catalyst.expressions.GenericInternalRow
** None of these had significant numbers of instances in JMAP1/2.
When the indexed DataFrame is used repeatedly afterwards, the driver memory
usage keeps increasing and eventually blows up in my application.
I wrote "with multiple partitions" because this issue goes away when
numPartitions is small (1 or 2).
Presumably this memory usage could be reduced.
Note: I also tested a custom indexing using RDD.zipWithIndex, and it is even
worse in terms of object creation (about 2x worse):
{code}
def zipWithUniqueIdFrom0(df: DataFrame): DataFrame = {
val sqlContext = df.sqlContext
val schema = df.schema
val outputSchema = StructType(Seq(
StructField("row", schema, false), StructField("id", DataTypes.IntegerType,
false)))
val rdd = df.rdd.zipWithIndex().map { case (row: Row, id: Long) => Row(row,
id.toInt) }
sqlContext.createDataFrame(rdd, outputSchema)
}
// val df2 = df.withColumn("id", monotonicallyIncreasingId())
val df2 = zipWithUniqueIdFrom0(df)
df2.cache().count()
{code}
was:
When monotonicallyIncreasingId is used on a DataFrame with many partitions, it
uses a very large amount of memory.
Consider this code:
{code}
import org.apache.spark.sql.functions._
// JMAP1: run jmap -histo:live [PID]
val numPartitions = 1000
val df = spark.range(0, 1000000, 1, numPartitions).toDF("vtx")
df.cache().count()
// JMAP2: run jmap -histo:live [PID]
val df2 = df.withColumn("id", monotonicallyIncreasingId())
df2.cache().count()
// JMAP3: run jmap -histo:live [PID]
{code}
Here's how memory usage progresses:
* JMAP1: This is just for calibration.
* JMAP2: No significant change from 1.
* JMAP3: Massive jump: 3048895 Longs, 1039638 Objects, 2007427 Integers,
1002000 org.apache.spark.sql.catalyst.expressions.GenericInternalRow
** None of these had significant numbers of instances in JMAP1/2.
When the indexed DataFrame is used repeatedly afterwards, the driver memory
usage keeps increasing and eventually blows up in my application.
I wrote "with multiple partitions" because this issue goes away when
numPartitions is small (1 or 2).
Presumably this memory usage could be reduced.
> monotonicallyIncreasingId should use less memory with multiple partitions
> -------------------------------------------------------------------------
>
> Key: SPARK-15419
> URL: https://issues.apache.org/jira/browse/SPARK-15419
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 2.0.0
> Environment: branch-2.0, 1 worker
> Reporter: Joseph K. Bradley
>
> When monotonicallyIncreasingId is used on a DataFrame with many partitions,
> it uses a very large amount of memory.
> Consider this code:
> {code}
> import org.apache.spark.sql.functions._
> // JMAP1: run jmap -histo:live [PID]
> val numPartitions = 1000
> val df = spark.range(0, 1000000, 1, numPartitions).toDF("vtx")
> df.cache().count()
> // JMAP2: run jmap -histo:live [PID]
> val df2 = df.withColumn("id", monotonicallyIncreasingId())
> df2.cache().count()
> // JMAP3: run jmap -histo:live [PID]
> {code}
> Here's how memory usage progresses:
> * JMAP1: This is just for calibration.
> * JMAP2: No significant change from 1.
> * JMAP3: Massive jump: 3048895 Longs, 1039638 Objects, 2007427 Integers,
> 1002000 org.apache.spark.sql.catalyst.expressions.GenericInternalRow
> ** None of these had significant numbers of instances in JMAP1/2.
> When the indexed DataFrame is used repeatedly afterwards, the driver memory
> usage keeps increasing and eventually blows up in my application.
> I wrote "with multiple partitions" because this issue goes away when
> numPartitions is small (1 or 2).
> Presumably this memory usage could be reduced.
> Note: I also tested a custom indexing using RDD.zipWithIndex, and it is even
> worse in terms of object creation (about 2x worse):
> {code}
> def zipWithUniqueIdFrom0(df: DataFrame): DataFrame = {
> val sqlContext = df.sqlContext
> val schema = df.schema
> val outputSchema = StructType(Seq(
> StructField("row", schema, false), StructField("id",
> DataTypes.IntegerType, false)))
> val rdd = df.rdd.zipWithIndex().map { case (row: Row, id: Long) => Row(row,
> id.toInt) }
> sqlContext.createDataFrame(rdd, outputSchema)
> }
> // val df2 = df.withColumn("id", monotonicallyIncreasingId())
> val df2 = zipWithUniqueIdFrom0(df)
> df2.cache().count()
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]