Hi, I posted this to users, but didn’t get any responses.
I just wanted to highlight what seems like excessive memory use when using 
sliding windows.
I have attached a test case where starting with certainly less than 1MB of data 
I can OOM a 10G heap.

Regards,
-JD



--------------

import java.sql.Timestamp

import org.apache.spark.sql.SparkSession
import org.junit.Test
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

import scala.collection.mutable.ArrayBuffer


/**
 * A Small Unit Test to demonstrate Spark Window Functions OOM
 */
class SparkTest {


  @Test
  def testWindows() {
    val sparkSession = 
SparkSession.builder().master("local[7]").appName("tests").getOrCreate()
    import sparkSession.implicits._

    println("Init Dataset")

    val partitions = (0 until 4)
    val entries = (0 until 6500)

    //val windows = (5 to 15 by 5) //Works
    val windows = (5 to 65 by 5)   //OOM 10G

    val testData = new ArrayBuffer[(String,Timestamp,Double)]


    for( p <- partitions) {
      for( e <- entries ) yield {
        testData += (("Key"+p,new Timestamp(60000*e),e*2.0))
      }
    }

    val ds = testData.toDF("key","datetime","value")
    ds.show()


    var resultFrame = ds
    resultFrame.schema.fields.foreach(println)


    val baseWin = Window.partitionBy("key").orderBy("datetime")
    for( win <- windows ) {
      resultFrame = 
resultFrame.withColumn("avg"+(-win),avg("value").over(baseWin.rowsBetween(-win,0)))
            
.withColumn("stddev"+(-win),stddev("value").over(baseWin.rowsBetween(-win,0)))
            
.withColumn("min"+(-win),min("value").over(baseWin.rowsBetween(-win,0)))
            
.withColumn("max"+(-win),max("value").over(baseWin.rowsBetween(-win,0)))
    }
    resultFrame.show()

  }

}


Attachment: signature.asc
Description: Message signed with OpenPGP using GPGMail

Reply via email to