[ 
https://issues.apache.org/jira/browse/SPARK-15419?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joseph K. Bradley updated SPARK-15419:
--------------------------------------
    Description: 
When monotonicallyIncreasingId is used on a DataFrame with many partitions, it 
uses a very large amount of memory.

Consider this code:
{code}
import org.apache.spark.sql.functions._

// JMAP1: run jmap -histo:live [PID]

val numPartitions = 1000
val df = spark.range(0, 1000000, 1, numPartitions).toDF("vtx")
df.cache().count()

// JMAP2

val df2 = df.withColumn("id", monotonicallyIncreasingId())
df2.cache().count()

// JMAP3

df2.select(col("id") + 1).count()

// JMAP4
{code}

Here's how memory usage progresses:
* JMAP1: This is just for calibration.
* JMAP2: No significant change from 1.
* JMAP3: Massive jump: 3048895 Longs, 1039638 Objects, 2007427 Integers, 
1002000 org.apache.spark.sql.catalyst.expressions.GenericInternalRow
** None of these had significant numbers of instances in JMAP1/2.
* JMAP4: This doubles the object creation.  I.e., even after caching, it keeps 
generating new objects on every use.

When the indexed DataFrame is used repeatedly afterwards, the driver memory 
usage keeps increasing and eventually blows up in my application.

I wrote "with multiple partitions" because this issue goes away when 
numPartitions is small (1 or 2).

Presumably this memory usage could be reduced.

Note: I also tested a custom indexing using RDD.zipWithIndex, and it is even 
worse in terms of object creation (about 2x worse):
{code}
def zipWithUniqueIdFrom0(df: DataFrame): DataFrame = {
  val sqlContext = df.sqlContext
  val schema = df.schema
  val outputSchema = StructType(Seq(
    StructField("row", schema, false), StructField("id", DataTypes.IntegerType, 
false)))
  val rdd = df.rdd.zipWithIndex().map { case (row: Row, id: Long) => Row(row, 
id.toInt) }
  sqlContext.createDataFrame(rdd, outputSchema)
}

// val df2 = df.withColumn("id", monotonicallyIncreasingId())
val df2 = zipWithUniqueIdFrom0(df)
df2.cache().count()
{code}


  was:
When monotonicallyIncreasingId is used on a DataFrame with many partitions, it 
uses a very large amount of memory.

Consider this code:
{code}
import org.apache.spark.sql.functions._

// JMAP1: run jmap -histo:live [PID]

val numPartitions = 1000
val df = spark.range(0, 1000000, 1, numPartitions).toDF("vtx")
df.cache().count()

// JMAP2: run jmap -histo:live [PID]

val df2 = df.withColumn("id", monotonicallyIncreasingId())
df2.cache().count()

// JMAP3: run jmap -histo:live [PID]
{code}

Here's how memory usage progresses:
* JMAP1: This is just for calibration.
* JMAP2: No significant change from 1.
* JMAP3: Massive jump: 3048895 Longs, 1039638 Objects, 2007427 Integers, 
1002000 org.apache.spark.sql.catalyst.expressions.GenericInternalRow
** None of these had significant numbers of instances in JMAP1/2.

When the indexed DataFrame is used repeatedly afterwards, the driver memory 
usage keeps increasing and eventually blows up in my application.

I wrote "with multiple partitions" because this issue goes away when 
numPartitions is small (1 or 2).

Presumably this memory usage could be reduced.

Note: I also tested a custom indexing using RDD.zipWithIndex, and it is even 
worse in terms of object creation (about 2x worse):
{code}
def zipWithUniqueIdFrom0(df: DataFrame): DataFrame = {
  val sqlContext = df.sqlContext
  val schema = df.schema
  val outputSchema = StructType(Seq(
    StructField("row", schema, false), StructField("id", DataTypes.IntegerType, 
false)))
  val rdd = df.rdd.zipWithIndex().map { case (row: Row, id: Long) => Row(row, 
id.toInt) }
  sqlContext.createDataFrame(rdd, outputSchema)
}

// val df2 = df.withColumn("id", monotonicallyIncreasingId())
val df2 = zipWithUniqueIdFrom0(df)
df2.cache().count()
{code}



> monotonicallyIncreasingId should use less memory with multiple partitions
> -------------------------------------------------------------------------
>
>                 Key: SPARK-15419
>                 URL: https://issues.apache.org/jira/browse/SPARK-15419
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.0.0
>         Environment: branch-2.0, 1 worker
>            Reporter: Joseph K. Bradley
>
> When monotonicallyIncreasingId is used on a DataFrame with many partitions, 
> it uses a very large amount of memory.
> Consider this code:
> {code}
> import org.apache.spark.sql.functions._
> // JMAP1: run jmap -histo:live [PID]
> val numPartitions = 1000
> val df = spark.range(0, 1000000, 1, numPartitions).toDF("vtx")
> df.cache().count()
> // JMAP2
> val df2 = df.withColumn("id", monotonicallyIncreasingId())
> df2.cache().count()
> // JMAP3
> df2.select(col("id") + 1).count()
> // JMAP4
> {code}
> Here's how memory usage progresses:
> * JMAP1: This is just for calibration.
> * JMAP2: No significant change from 1.
> * JMAP3: Massive jump: 3048895 Longs, 1039638 Objects, 2007427 Integers, 
> 1002000 org.apache.spark.sql.catalyst.expressions.GenericInternalRow
> ** None of these had significant numbers of instances in JMAP1/2.
> * JMAP4: This doubles the object creation.  I.e., even after caching, it 
> keeps generating new objects on every use.
> When the indexed DataFrame is used repeatedly afterwards, the driver memory 
> usage keeps increasing and eventually blows up in my application.
> I wrote "with multiple partitions" because this issue goes away when 
> numPartitions is small (1 or 2).
> Presumably this memory usage could be reduced.
> Note: I also tested a custom indexing using RDD.zipWithIndex, and it is even 
> worse in terms of object creation (about 2x worse):
> {code}
> def zipWithUniqueIdFrom0(df: DataFrame): DataFrame = {
>   val sqlContext = df.sqlContext
>   val schema = df.schema
>   val outputSchema = StructType(Seq(
>     StructField("row", schema, false), StructField("id", 
> DataTypes.IntegerType, false)))
>   val rdd = df.rdd.zipWithIndex().map { case (row: Row, id: Long) => Row(row, 
> id.toInt) }
>   sqlContext.createDataFrame(rdd, outputSchema)
> }
> // val df2 = df.withColumn("id", monotonicallyIncreasingId())
> val df2 = zipWithUniqueIdFrom0(df)
> df2.cache().count()
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to