Hello all, I am currently looking in 1 spark application to squeze little performance and here this code (attached in email)
I looked in difference and in: org.apache.spark.sql.catalyst.CatalystTypeConverters.ArrayConverter if its primitive we still use boxing and unboxing version because in code org.apache.spark.sql.catalyst.util.ArrayData#toArray we don't use method : ArrayData .toDoubleArray as its used in VectorUDT. Now is the question do i need to provide patch or someone can me show it how to get same performance with array as with dense vector. Or i need to create jira ticket Thanks
import org.apache.spark.ml.linalg.{DenseVector, Vectors} import scala.util.Random import spark.implicits._ val dotVector = udf {(x:DenseVector,y:DenseVector) => { var i = 0; var dotProduct = 0.0 val size = x.size;val v1 = x.values; val v2 = y.values while (i < size) { dotProduct += v1(i) * v2(i) i += 1 } dotProduct}} val dotSeq = udf {(x:Seq[Double],y:Seq[Double]) => { var i = 0;var dotProduct = 0.0;val size = x.size while (i < size) { dotProduct += x(i) * y(i) i += 1 } dotProduct}} def time(name: String, block: => Unit): Float = { val t0 = System.nanoTime() block // call-by-name val t1 = System.nanoTime() //println(s"$name: " + (t1 - t0) / 1000000000f + "s") ((t1 - t0)/ 1000000000f ) } val densevector = udf { (p: Seq[Float]) => Vectors.dense(p.map(_.toDouble).toArray) } val genVec = udf { (l:Int,c:Int) => { val r = new Random(l*c) (1 to 300).map(p => r.nextDouble()).toArray} } val dfBig = {Seq(1).toDF("s") .withColumn("line",explode(lit((1 to 1000).toArray))) .withColumn("column",explode(lit((1 to 200).toArray))) .withColumn("v1",genVec(col("line").+(lit(22)).*(lit(-1)),col("column"))) .withColumn("v2",genVec(col("line"),col("column"))) .withColumn("v1d",densevector(col("v1"))) .withColumn("v2d",densevector(col("v2"))) .repartition(1) .persist()} dfBig.count dfBig.show(10) val arrayTime =(1 to 20).map {p=> time("array",dfBig.withColumn("dot",dotSeq(col("v1"),col("v2"))).sort(desc("dot")).limit(10).collect())}.sum /20 val vectorTime = (1 to 20).map {p=> time("array",dfBig.withColumn("dot",dotVector(col("v1d"),col("v2d"))).sort(desc("dot")).limit(10).collect())}.sum / 20 vectorTime/ arrayTime *100
--------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org