[
https://issues.apache.org/jira/browse/SPARK-17396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15465375#comment-15465375
]
Ryan Blue commented on SPARK-17396:
-----------------------------------
I'm not sure that the ForkJoinPool is to blame. Each partition in a Hive table
becomes a partition RDD in the union. Each UnionRDD creates a new ForkJoinPool
(parallelism=8) to list the files in a partition in parallel. I think it is
more likely that the problem is that we don't use a shared ForkJoinPool, but
instead create a new one for each RDD that doesn't get cleaned up until the
UnionRDD is cleaned. Even if the ForkJoinPool can't reuse threads and we're
getting more than 8, it wouldn't grow to thousands if we weren't creating so
many pools. But, I agree that we should consider a different executor service.
I've been reading up on ForkJoinPool and it looks like it's not a great
implementation; just trying to determine the maximum number of threads it will
use was painfully undocumented.
[~pin_zhang], can you post a list of the thread names? We can use the names to
determine how many threads there are in each pool. That should tell us whether
we should use a different executor service in addition to using a shared pool.
> Threads number keep increasing when query on external CSV partitioned table
> ---------------------------------------------------------------------------
>
> Key: SPARK-17396
> URL: https://issues.apache.org/jira/browse/SPARK-17396
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 2.0.0
> Reporter: pin_zhang
>
> 1. Create a external partitioned table row format CSV
> 2. Add 16 partitions to the table
> 3. Run SQL "select count(*) from test_csv"
> 4. ForkJoinThread number keep increasing
> This happend when table partitions number greater than 10.
> 5. Test Code
> {code:lang=java}
> import org.apache.spark.SparkConf
> import org.apache.spark.SparkContext
> import org.apache.spark.sql.hive.HiveContext
> object Bugs {
> def main(args: Array[String]): Unit = {
> val location = "file:///g:/home/test/csv"
> val create = s"""CREATE EXTERNAL TABLE test_csv
> (ID string, SEQ string )
> PARTITIONED BY(index int)
> ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
> LOCATION "${location}"
> """
> val add_part = s"""
> ALTER TABLE test_csv ADD
> PARTITION (index=1)LOCATION '${location}/index=1'
> PARTITION (index=2)LOCATION '${location}/index=2'
> PARTITION (index=3)LOCATION '${location}/index=3'
> PARTITION (index=4)LOCATION '${location}/index=4'
> PARTITION (index=5)LOCATION '${location}/index=5'
> PARTITION (index=6)LOCATION '${location}/index=6'
> PARTITION (index=7)LOCATION '${location}/index=7'
> PARTITION (index=8)LOCATION '${location}/index=8'
> PARTITION (index=9)LOCATION '${location}/index=9'
> PARTITION (index=10)LOCATION '${location}/index=10'
> PARTITION (index=11)LOCATION '${location}/index=11'
> PARTITION (index=12)LOCATION '${location}/index=12'
> PARTITION (index=13)LOCATION '${location}/index=13'
> PARTITION (index=14)LOCATION '${location}/index=14'
> PARTITION (index=15)LOCATION '${location}/index=15'
> PARTITION (index=16)LOCATION '${location}/index=16'
> """
> val conf = new SparkConf().setAppName("scala").setMaster("local[2]")
> conf.set("spark.sql.warehouse.dir", "file:///g:/home/warehouse")
> val ctx = new SparkContext(conf)
> val hctx = new HiveContext(ctx)
> hctx.sql(create)
> hctx.sql(add_part)
> for (i <- 1 to 6) {
> new Query(hctx).start()
> }
> }
> class Query(htcx: HiveContext) extends Thread {
> setName("Query-Thread")
> override def run = {
> while (true) {
> htcx.sql("select count(*) from test_csv").show()
> Thread.sleep(100)
> }
> }
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]