Hi, I posted this to users, but didn’t get any responses. I just wanted to highlight what seems like excessive memory use when using sliding windows. I have attached a test case where starting with certainly less than 1MB of data I can OOM a 10G heap.
Regards, -JD -------------- import java.sql.Timestamp import org.apache.spark.sql.SparkSession import org.junit.Test import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions._ import scala.collection.mutable.ArrayBuffer /** * A Small Unit Test to demonstrate Spark Window Functions OOM */ class SparkTest { @Test def testWindows() { val sparkSession = SparkSession.builder().master("local[7]").appName("tests").getOrCreate() import sparkSession.implicits._ println("Init Dataset") val partitions = (0 until 4) val entries = (0 until 6500) //val windows = (5 to 15 by 5) //Works val windows = (5 to 65 by 5) //OOM 10G val testData = new ArrayBuffer[(String,Timestamp,Double)] for( p <- partitions) { for( e <- entries ) yield { testData += (("Key"+p,new Timestamp(60000*e),e*2.0)) } } val ds = testData.toDF("key","datetime","value") ds.show() var resultFrame = ds resultFrame.schema.fields.foreach(println) val baseWin = Window.partitionBy("key").orderBy("datetime") for( win <- windows ) { resultFrame = resultFrame.withColumn("avg"+(-win),avg("value").over(baseWin.rowsBetween(-win,0))) .withColumn("stddev"+(-win),stddev("value").over(baseWin.rowsBetween(-win,0))) .withColumn("min"+(-win),min("value").over(baseWin.rowsBetween(-win,0))) .withColumn("max"+(-win),max("value").over(baseWin.rowsBetween(-win,0))) } resultFrame.show() } }
signature.asc
Description: Message signed with OpenPGP using GPGMail