[
https://issues.apache.org/jira/browse/SPARK-23427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16367742#comment-16367742
]
Pratik Dhumal edited comment on SPARK-23427 at 2/16/18 7:18 PM:
----------------------------------------------------------------
{code:java}
// code placeholder
@Test
def testLoop() = {
val schema = new StructType().add("test", types.IntegerType)
var t1 = spark.createDataFrame(spark.sparkContext.parallelize(1 to
1000000).map(i => Row(i)), schema)
val t2 = spark.createDataFrame(spark.sparkContext.parallelize(4 to
1400).map(i => Row(i)), schema)
val t3 = spark.createDataFrame(spark.sparkContext.parallelize(15 to
190).map(i => Row(i)), schema)
val t4 = spark.createDataFrame(spark.sparkContext.parallelize(135 to
652).map(i => Row(i)), schema)
val t5 = spark.createDataFrame(spark.sparkContext.parallelize(86 to
352).map(i => Row(i)), schema)
t1.persist().count()
t2.persist().count()
t3.persist().count()
t4.persist().count()
t5.persist().count()
var dfResult: DataFrame = null
while (true) {
var t3Filter = t3.filter("test % 2 = 1")
var t4Filter = t4.filter("test % 2 = 0")
t1.createOrReplaceTempView("T1")
t2.createOrReplaceTempView("T2")
t3Filter.createOrReplaceTempView("T3")
t4Filter.createOrReplaceTempView("T4")
t5.createOrReplaceTempView("T5")
var query =
""" SELECT T1.* FROM T1
| INNER JOIN T2 ON T1.test=t2.test
| LEFT JOIN T3 ON T1.test=t3.test
| LEFT JOIN T4 ON T1.test=t4.test
| LEFT JOIN T5 ON T1.test=t5.test
| """.stripMargin
if (t1 == null) {
t1 = spark.sql(query)
t1.persist().count()
} else {
var tmp1 = spark.sql(query)
var tmp2 = t1
t1 = tmp1.union(tmp2)
t1.persist().count()
tmp2.unpersist(true)
tmp2 = null
}
println("t1 : " + (SizeEstimator.estimate(t1) / 1024 / 1024))
// Do Something - Currently doing nothing
spark.catalog.dropTempView("T1")
spark.catalog.dropTempView("T2")
spark.catalog.dropTempView("T3")
spark.catalog.dropTempView("T4")
spark.catalog.dropTempView("T5")
}
t3.unpersist(true)
t2.unpersist(true)
t1.unpersist(true)
t4.unpersist(true)
t5.unpersist(true)
println("VOID")
}
// RESULT LOG
t1 : 8
t1 : 208
t1 : 310
t1 : 187
t1 : 441
t1 : 440
t1 : 547
t1 : 651
t1 : 759
t1 : 733
t1 : 1129{code}
Hope this helps.
was (Author: dpratik):
{code:java}
// code placeholder
@Test
def testLoop() = {
val schema = new StructType().add("test", types.IntegerType)
var t1 = spark.createDataFrame(spark.sparkContext.parallelize(1 to
1000000).map(i => Row(i)), schema)
val t2 = spark.createDataFrame(spark.sparkContext.parallelize(4 to
1400).map(i => Row(i)), schema)
val t3 = spark.createDataFrame(spark.sparkContext.parallelize(15 to
190).map(i => Row(i)), schema)
val t4 = spark.createDataFrame(spark.sparkContext.parallelize(135 to
652).map(i => Row(i)), schema)
val t5 = spark.createDataFrame(spark.sparkContext.parallelize(86 to
352).map(i => Row(i)), schema)
t1.persist().count()
t2.persist().count()
t3.persist().count()
t4.persist().count()
t5.persist().count()
var dfResult: DataFrame = null
while (true) {
var t3Filter = t3.filter("test % 2 = 1")
var t4Filter = t4.filter("test % 2 = 0")
t1.createOrReplaceTempView("T1")
t2.createOrReplaceTempView("T2")
t3Filter.createOrReplaceTempView("T3")
t4Filter.createOrReplaceTempView("T4")
t5.createOrReplaceTempView("T5")
var query =
""" SELECT T1.* FROM T1
| INNER JOIN T2 ON T1.test=t2.test
| LEFT JOIN T3 ON T1.test=t3.test
| LEFT JOIN T4 ON T1.test=t4.test
| LEFT JOIN T5 ON T1.test=t5.test
| """.stripMargin
if (t1 == null) {
t1 = spark.sql(query)
t1.persist().count()
} else {
var tmp1 = spark.sql(query)
var tmp2 = t1
t1 = tmp1.union(tmp2)
t1.persist().count()
tmp2.unpersist(true)
tmp2 = null
}
println("t1 : " + (SizeEstimator.estimate(t1) / 1024 / 1024))
// Do Something - Currently doing nothing
spark.catalog.dropTempView("T1")
spark.catalog.dropTempView("T2")
spark.catalog.dropTempView("T3")
spark.catalog.dropTempView("T4")
spark.catalog.dropTempView("T5")
}
t3.unpersist(true)
t2.unpersist(true)
t1.unpersist(true)
t4.unpersist(true)
t5.unpersist(true)
println("VOID")
}
{code}
Hope this helps.
> spark.sql.autoBroadcastJoinThreshold causing OOM exception in the driver
> -------------------------------------------------------------------------
>
> Key: SPARK-23427
> URL: https://issues.apache.org/jira/browse/SPARK-23427
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 2.0.0
> Environment: SPARK 2.0 version
> Reporter: Dhiraj
> Priority: Critical
>
> We are facing issue around value of spark.sql.autoBroadcastJoinThreshold.
> With spark.sql.autoBroadcastJoinThreshold -1 ( disable) we seeing driver
> memory used flat.
> With any other values 10MB, 5MB, 2 MB, 1MB, 10K, 1K we see driver memory used
> goes up with rate depending upon the size of the autoBroadcastThreshold and
> getting OOM exception. The problem is memory used by autoBroadcast is not
> being free up in the driver.
> Application imports oracle tables as master dataframes which are persisted.
> Each job applies filter to these tables and then registered them as
> tempViewTable . Then sql query are using to process data further. At the end
> all the intermediate dataFrame are unpersisted.
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]