[ 
https://issues.apache.org/jira/browse/SPARK-33896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xudingyu updated SPARK-33896:
-----------------------------
    Description: 
*Goals:*
•       Make Spark 3.0 Scheduler DataSource-Cache-Aware in multi-replication 
HDFS cluster
•       Performance gain in E2E workload when enabling this feature

*Problem Statement:*
Spark’s DAGScheduler currently schedule tasks according to RDD’s 
preferLocations, which repects HDFS BlockLocation. In a multi-replication 
cluster, HDFS BlockLocation can be returned as an Array[BlockLocation], Spark 
chooses one of the BlockLocation to run tasks on. However,+ tasks can run 
faster if scheduled to the nodes with datasource cache that they need+. 
+Currently there’re no datasource cache locality provision mechanism in Spark 
if nodes in the cluster have cache data+.
This project aims to add a cache-locality-aware mechanism. Spark DAGScheduler 
can schedule tasks to the nodes with datasource cache according to cache 
locality in a multi-replication HDFS.

*Basic idea:*
The basic idea is to open a datasource cache locality provider interface in 
Spark and with default implementation is to respect HDFS BlockLocation. Worker 
nodes datasource cache meta(like offset, length) needs to be stored in an 
externalDB like Redis. Spark driver can look up these cache meta and customize 
task schedule locality algorithm to choose the most efficient node.

*CBL(Cost Based Locality)*
CBL(cost based locality), takes cache size、disk IO、network IO...... into 
account when scheduling tasks.
Say there’re 3 nodes A、B、C in a 2-replication HDFS cluster. When Spark 
scheduling task1, nodeB have all the data replication on disk that task1 needs, 
at the same time, nodeA has 20% datasource cache and 50% data replication on 
disk.

Then we calculate the cost for schedule task1 on nodeA、nodeB and nodeC.

CostA = CalculateCost(20% read from cache) + CalculateCost(50% read from disk) 
+ CalculateCost(30% read from remote)

CostB = CalculateCost(100% read from disk)

CostC = CalculateCost(100% read from remote)

Return the node with minimal cost.


*Modifications:*
A config is needed to decide which cache locality provider to use, can be as 
follows

{code:java}
SQLConf.PARTITIONED_FILE_PREFERREDLOC_IMPL

{code}

For Spark3.0 need to modify FilePartition.scala$preferredLocations() can be as 
follows

{code:java}
override def preferredLocations(): Array[String] = {
Utils.classForName(SparkEnv.get.conf.get(SQLConf.PARTITIONED_FILE_PREFERREDLOC_IMPL))
        . getConstructor()
        . newInstance()
        . getPreferredLocs()
}

{code}


  was:
*Goals:*
•       Make Spark 3.0 Scheduler DataSource-Cache-Aware in multi-replication 
HDFS cluster
•       Performance gain in E2E workload when enabling this feature

*Problem Statement:*
Spark’s DAGScheduler currently schedule tasks according to RDD’s 
preferLocations, which repects HDFS BlockLocation. In a multi-replication 
cluster, HDFS BlockLocation can be returned as an Array[BlockLocation], Spark 
chooses one of the BlockLocation to run tasks on. However, *tasks can run 
faster if scheduled to the nodes with datasource cache that they need*. 
Currently there’re no datasource cache locality provision mechanism in Spark if 
nodes in the cluster have cache data.
This project aims to add a cache-locality-aware mechanism. Spark DAGScheduler 
can schedule tasks to the nodes with datasource cache according to cache 
locality in a multi-replication HDFS.

*Basic idea:*
The basic idea is to open a datasource cache locality provider interface in 
Spark and with default implementation is to respect HDFS BlockLocation. Worker 
nodes datasource cache meta(like offset, length) needs to be stored in an 
externalDB like Redis. Spark driver can look up these cache meta and customize 
task schedule locality algorithm to choose the most efficient node.

*CBL(Cost Based Locality)*
CBL(cost based locality), takes cache size、disk IO、network IO...... into 
account when scheduling tasks.
Say there’re 3 nodes A、B、C in a 2-replication HDFS cluster. When Spark 
scheduling task1, nodeB have all the data replication on disk that task1 needs, 
at the same time, nodeA has 20% datasource cache and 50% data replication on 
disk.

Then we calculate the cost for schedule task1 on nodeA、nodeB and nodeC.

CostA = CalculateCost(20% read from cache) + CalculateCost(50% read from disk) 
+ CalculateCost(30% read from remote)

CostB = CalculateCost(100% read from disk)

CostC = CalculateCost(100% read from remote)

Return the node with minimal cost.


*Modifications:*
A config is needed to decide which cache locality provider to use, can be as 
follows

{code:java}
SQLConf.PARTITIONED_FILE_PREFERREDLOC_IMPL

{code}

For Spark3.0 need to modify FilePartition.scala$preferredLocations() can be as 
follows

{code:java}
override def preferredLocations(): Array[String] = {
Utils.classForName(SparkEnv.get.conf.get(SQLConf.PARTITIONED_FILE_PREFERREDLOC_IMPL))
        . getConstructor()
        . newInstance()
        . getPreferredLocs()
}

{code}



> Make Spark DAGScheduler datasource cache aware when scheduling tasks in a 
> multi-replication HDFS
> ------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-33896
>                 URL: https://issues.apache.org/jira/browse/SPARK-33896
>             Project: Spark
>          Issue Type: New Feature
>          Components: Spark Core
>    Affects Versions: 3.0.0
>            Reporter: Xudingyu
>            Priority: Major
>
> *Goals:*
> •     Make Spark 3.0 Scheduler DataSource-Cache-Aware in multi-replication 
> HDFS cluster
> •     Performance gain in E2E workload when enabling this feature
> *Problem Statement:*
> Spark’s DAGScheduler currently schedule tasks according to RDD’s 
> preferLocations, which repects HDFS BlockLocation. In a multi-replication 
> cluster, HDFS BlockLocation can be returned as an Array[BlockLocation], Spark 
> chooses one of the BlockLocation to run tasks on. However,+ tasks can run 
> faster if scheduled to the nodes with datasource cache that they need+. 
> +Currently there’re no datasource cache locality provision mechanism in Spark 
> if nodes in the cluster have cache data+.
> This project aims to add a cache-locality-aware mechanism. Spark DAGScheduler 
> can schedule tasks to the nodes with datasource cache according to cache 
> locality in a multi-replication HDFS.
> *Basic idea:*
> The basic idea is to open a datasource cache locality provider interface in 
> Spark and with default implementation is to respect HDFS BlockLocation. 
> Worker nodes datasource cache meta(like offset, length) needs to be stored in 
> an externalDB like Redis. Spark driver can look up these cache meta and 
> customize task schedule locality algorithm to choose the most efficient node.
> *CBL(Cost Based Locality)*
> CBL(cost based locality), takes cache size、disk IO、network IO...... into 
> account when scheduling tasks.
> Say there’re 3 nodes A、B、C in a 2-replication HDFS cluster. When Spark 
> scheduling task1, nodeB have all the data replication on disk that task1 
> needs, at the same time, nodeA has 20% datasource cache and 50% data 
> replication on disk.
> Then we calculate the cost for schedule task1 on nodeA、nodeB and nodeC.
> CostA = CalculateCost(20% read from cache) + CalculateCost(50% read from 
> disk) + CalculateCost(30% read from remote)
> CostB = CalculateCost(100% read from disk)
> CostC = CalculateCost(100% read from remote)
> Return the node with minimal cost.
> *Modifications:*
> A config is needed to decide which cache locality provider to use, can be as 
> follows
> {code:java}
> SQLConf.PARTITIONED_FILE_PREFERREDLOC_IMPL
> {code}
> For Spark3.0 need to modify FilePartition.scala$preferredLocations() can be 
> as follows
> {code:java}
> override def preferredLocations(): Array[String] = {
> Utils.classForName(SparkEnv.get.conf.get(SQLConf.PARTITIONED_FILE_PREFERREDLOC_IMPL))
>       . getConstructor()
>               . newInstance()
>       . getPreferredLocs()
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to