I am working on a scala code which performs Linear Regression on certain
datasets. Right now I am using 20 cores and 25 executors and everytime I run
a Spark job I get a different result.

The input size of the files are 2GB and 400 MB.However, when I run the job
with 20 cores and 1 executor, I get consistent results.

Has anyone experienced such a thing so far?

Please find the code below:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SchemaRDD
import org.apache.spark.Partitioner
import org.apache.spark.storage.StorageLevel

object TextProcess{
  def main(args: Array[String]){
            val conf = new SparkConf().set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
            val sc = new SparkContext(conf)
            val sqlContext = new org.apache.spark.sql.SQLContext(sc)
            val numExecutors=(conf.get("spark.executor.instances").toInt)
            // Read the 2 input files
            // First file is either cases / controls
            val input1 = sc.textFile(args(0))
            // Second file is Gene Expression
            val input2 = sc.textFile(args(1))

              //collecting header information
            val header1=sc.parallelize(input1.take(1))
            val header2=sc.parallelize(input2.take(1))

            //mapping data without the header information
            val map1 = input1.subtract(header1).map(x => (x.split("
")(0)+x.split(" ")(1), x))
            val map2 = input2.subtract(header2).map(x => (x.split("
")(0)+x.split(" ")(1), x))


            //joining data. here is where the order was getting affected. 
            val joinedMap = map1.join(map2)

            //adding the header back to the top of RDD
            val x = header1.union(joinedMap.map{case(x,(y,z))=>y})

            val y = header2.union(joinedMap.map{case(x,(y,z))=>z})

            //removing irrelevant columns
            val rddX = x.map(x=>x.split("
").drop(3)).zipWithIndex.map{case(a,b)=> a.map(x=>b.toString+"
"+x.toString)}
            val rddY = y.map(x=>x.split("
").drop(2)).zipWithIndex.map{case(a,b)=> a.map(x=>b.toString+"
"+x.toString)}


            //transposing and cross joining data. This keeps the identifier
at the start
            val transposedX = rddX.flatMap(x =>
x.zipWithIndex.map(x=>x.swap)).reduceByKey((a,b)=>
a+":"+b).map{case(a,b)=>b.split(":").sorted}
            val transposedY = rddY.flatMap(x =>
x.zipWithIndex.map(x=>x.swap)).reduceByKey((a,b)=>
a+":"+b).map{case(a,b)=>b.split(":").sorted}.persist(StorageLevel.apply(false,
true, false, false, numExecutors))

            val cleanedX =
transposedX.map(x=>x.map(x=>x.slice(x.indexOfSlice(" ")+1,x.length)))
            val cleanedY =
transposedY.map(x=>x.map(x=>x.slice(x.indexOfSlice("
")+1,x.length))).persist(StorageLevel.apply(false, true, false, false,
numExecutors))


            val cartXY = cleanedX.cartesian(cleanedY)
            val finalDataSet= cartXY.map{case(a,b)=>a zip b} 
            //convert to key value pair
            val regressiondataset =
finalDataSet.map(x=>(x(0),x.drop(1).filter{case(a,b)=> a!="NA" && b!="NA" &&
a!="null" && b!="null"}.map{case(a,b)=> (a.toDouble, b.toDouble)}))


            val linearOutput = regressiondataset.map(s => new
LinearRegression(s._1 ,s._2).outputVal)

            linearOutput.saveAsTextFile(args(2))
            cleanedY.unpersist()
            transposedY.unpersist()

  }
}





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-job-returns-a-different-result-on-each-run-tp23861.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to