Ravindra Pesala created SPARK-3100:
--------------------------------------

             Summary: Spark RDD partitions are not running in the workers as 
per locality information given by each partition.
                 Key: SPARK-3100
                 URL: https://issues.apache.org/jira/browse/SPARK-3100
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 1.0.0
         Environment: Running in Spark Standalone Cluster
            Reporter: Ravindra Pesala


I created a simple custom RDD (SampleRDD.scala)and created 4 splits for 4 
workers.
When I run this RDD in Spark standalone cluster with 4 workers(even master 
machine has one worker), it runs all partitions in one node only even though I 
have given locality preferences in my SampleRDD program. 

*Sample Code*

class SamplePartition(rddId: Int, val idx: Int,val tableSplit:Seq[String])
  extends Partition {
  override def hashCode(): Int = 41 * (41 + rddId) + idx 
  override val index: Int = idx
}


class SampleRDD[K,V](
    sc : SparkContext,keyClass: KeyVal[K,V])
  extends RDD[(K,V)](sc, Nil)
  with Logging {

  override def getPartitions: Array[Partition] = {
    val hosts = Array("master","slave1","slave2","slave3")
    val result = new Array[Partition](4)
    for (i <- 0 until result.length) 
    {
      result(i) = new SamplePartition(id, i, Array(hosts(i)))
    }
    result
  }
  

  
  override def compute(theSplit: Partition, context: TaskContext) = {
    val iter = new Iterator[(K,V)] {
      val split = theSplit.asInstanceOf[SamplePartition]
      logInfo("Executed task for the split" + split.tableSplit)
    
      // Register an on-task-completion callback to close the input stream.
      context.addOnCompleteCallback(() => close())
      var havePair = false
      var finished = false
      override def hasNext: Boolean = {
        if (!finished && !havePair) 
        {
          finished = !false
          havePair = !finished
        }
        !finished
      }
      override def next(): (K,V) = {
        if (!hasNext) {
          throw new java.util.NoSuchElementException("End of stream")
        }
        havePair = false
        val key = new Key()
        val value = new Value()
        keyClass.getKey(key, value)
      }
      private def close() {
        try {
//          reader.close()
        } catch {
          case e: Exception => logWarning("Exception in RecordReader.close()", 
e)
        }
      }
    }
    iter
  }
  
  override def getPreferredLocations(split: Partition): Seq[String] = {
    val theSplit = split.asInstanceOf[SamplePartition]
    val s = theSplit.tableSplit.filter(_ != "localhost")
    logInfo("Host Name : "+s(0))
    s
  }
}

trait KeyVal[K,V] extends Serializable {
        def getKey(key : Key,value : Value) : (K,V)     
}
class KeyValImpl extends KeyVal[Key,Value] {
  override def getKey(key : Key,value : Value) = (key,value)
}
case class Key()
case class Value()

object SampleRDD {
    def main(args: Array[String]) : Unit=    {
        val d = SparkContext.jarOfClass(this.getClass)
            val ar = new Array[String](d.size)
            var i = 0
            d.foreach{
              p=> ar(i)=p;
              i = i+1
              }           
           val sc = new SparkContext("spark://master:7077", "SampleSpark", 
"/opt/spark-1.0.0-rc3/",ar)             
           val rdd = new SampleRDD(sc,new KeyValImpl());
           rdd.collect;
    }
}

Following is the log it shows.

INFO  18-08 16:38:33,382 - Executor updated: app-20140818163833-0005/0 is now 
RUNNING
INFO  18-08 16:38:33,382 - Executor updated: app-20140818163833-0005/2 is now 
RUNNING
INFO  18-08 16:38:33,383 - Executor updated: app-20140818163833-0005/1 is now 
RUNNING
INFO  18-08 16:38:33,385 - Executor updated: app-20140818163833-0005/3 is now 
RUNNING
INFO  18-08 16:38:34,976 - Registered executor: Actor 
akka.tcp://sparkExecutor@master:47563/user/Executor#-398354094 with ID 0
INFO  18-08 16:38:34,984 - Starting task 0.0:0 as TID 0 on executor 0: *master 
(PROCESS_LOCAL)*
INFO  18-08 16:38:34,989 - Serialized task 0.0:0 as 1261 bytes in 3 ms
INFO  18-08 16:38:34,992 - Starting task 0.0:1 as TID 1 on executor 0: *master 
(PROCESS_LOCAL)*
INFO  18-08 16:38:34,993 - Serialized task 0.0:1 as 1261 bytes in 0 ms
INFO  18-08 16:38:34,993 - Starting task 0.0:2 as TID 2 on executor 0: *master 
(PROCESS_LOCAL)*
INFO  18-08 16:38:34,993 - Serialized task 0.0:2 as 1261 bytes in 0 ms
INFO  18-08 16:38:34,994 - Starting task 0.0:3 as TID 3 on executor 0: *master 
(PROCESS_LOCAL)*
INFO  18-08 16:38:34,994 - Serialized task 0.0:3 as 1261 bytes in 0 ms
INFO  18-08 16:38:35,174 - Registering block manager master:42125 with 294.4 MB 
RAM
INFO  18-08 16:38:35,296 - Registered executor: Actor 
akka.tcp://sparkExecutor@slave1:31726/user/Executor#492173410 with ID 2
INFO  18-08 16:38:35,302 - Registered executor: Actor 
akka.tcp://sparkExecutor@slave2:25769/user/Executor#1762839887 with ID 1
INFO  18-08 16:38:35,317 - Registered executor: Actor 
akka.tcp://sparkExecutor@slave3:51032/user/Executor#981476000 with ID 3







--
This message was sent by Atlassian JIRA
(v6.2#6252)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to