[ 
https://issues.apache.org/jira/browse/SPARK-10645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14964068#comment-14964068
 ] 

Arvind Surve commented on SPARK-10645:
--------------------------------------

Spearman's correlation coefficient (SpCoeff) does not fit into the UDAF model, 
as rank needs to be calculated for every column independently.

I have created a stand-alone method to have holistic approach to evaluate 
SpCoeff which is outlined below.
This method takes two arrays -- representing two columns -- (This can be 
converted to taking two RDDs as input parameters) and returns SpCoeff. This 
method can be added in org.apache.spark.sql.execution.stat.StatFunction.scala, 
with coff() method invoked for "spearman" method.

Please provide feedback on this approach and then go from there.

  // This function will calculate Spearman's rank correlation coefficient
  // Reference: 
https://en.wikipedia.org/wiki/Spearman%27s_rank_correlation_coefficient
  def computeSpearmanCorrCoeff(sc: SparkContext, data1:Array[Int], 
data2:Array[Int]): Double = {

    val rddData1 = sc.parallelize(data1)
    val rddData2 = sc.parallelize(data2)

    //Calculate Rank for first vector data.
    val rddData1Rank = rddData1 .zipWithIndex()
                    .sortByKey()
                    .zipWithIndex()
                    .map{case((a,b),c)=> (a,((c+1.0),1.0))}
                    .reduceByKey{case(a,b) => 
(((a._1*a._2+b._1*b._2)/(a._2+b._2),(a._2 + b._2 )))}
                    .map { case (a,(b,c)) => (a,b)}

    //Calculate Rank for second vector data.
    val rddData2Rank = rddData2 .zipWithIndex()
                    .sortByKey()
                    .zipWithIndex()
                    .map{case((a,b),c)=> (a,((c+1.0),1.0))}
                    .reduceByKey{case(a,b) => 
(((a._1*a._2+b._1*b._2)/(a._2+b._2),(a._2 + b._2 )))}
                    .map { case (a,(b,c)) => (a,b)}

    //Calculate sum of square of diffrence of ranks between two vector 
corresponding elements in original order.
    val sumSqRankDiff = rddData1.zip(rddData2)
                    .join(rddData1Rank).map{case (a,(b,c)) => (b, (a, c))}
                    .join(rddData2Rank).map{case (a,((b,c),d)) => 
(d-c)*(d-c)}.sum()

    //Length of vector.
    val dataLen = rddData1Rank.count()

    // Return Spearman's rank correlation coefficient.
    return (1 - (6 * sumSqRankDiff)/(dataLen*(dataLen*dataLen -1)))
  }


-Arvind Surve

> Bivariate Statistics: Spearman's Correlation support as UDAF
> ------------------------------------------------------------
>
>                 Key: SPARK-10645
>                 URL: https://issues.apache.org/jira/browse/SPARK-10645
>             Project: Spark
>          Issue Type: Sub-task
>          Components: ML, SQL
>            Reporter: Jihong MA
>
> Spearman's rank correlation coefficient : 
> https://en.wikipedia.org/wiki/Spearman%27s_rank_correlation_coefficient



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to