Hi Ajay,

Can you please try running the same code with spark.shuffle.spill=false and
see if the numbers turn out correctly?  That parameter controls whether or
not the buggy code that Matei fixed in ExternalAppendOnlyMap is used.

FWIW I saw similar issues in 0.9.0 but no longer in 0.9.1 after I think
some fixes in spilling landed.

Andrew


On Thu, Jun 5, 2014 at 3:05 PM, Matei Zaharia <matei.zaha...@gmail.com>
wrote:

> Hey Ajay, thanks for reporting this. There was indeed a bug, specifically
> in the way join tasks spill to disk (which happened when you had more
> concurrent tasks competing for memory). I’ve posted a patch for it here:
> https://github.com/apache/spark/pull/986. Feel free to try that if you’d
> like; it will also be in 0.9.2 and 1.0.1.
>
> Matei
>
> On Jun 5, 2014, at 12:19 AM, Ajay Srivastava <a_k_srivast...@yahoo.com>
> wrote:
>
> Sorry for replying late. It was night here.
>
> Lian/Matei,
> Here is the code snippet -
>     sparkConf.set("spark.executor.memory", "10g")
>     sparkConf.set("spark.cores.max", "5")
>
>     val sc = new SparkContext(sparkConf)
>
>     val accId2LocRDD = sc.textFile("
> hdfs://bbr-dev178:9000/data/subDbSpark/account2location").map(getKeyValueFromString(_,
> 0, ',', true))
>
>     val accId2DemoRDD = sc.textFile("
> hdfs://bbr-dev178:9000/data/subDbSpark/account2demographic_planType").map(getKeyValueFromString(_,
> 0, ',', true))
>
>     val joinedRDD = accId2LocRDD.join(accId2DemoRDD)
>
>   def getKeyValueFromString(line: String, keyIndex: Int, delimit: Char,
> retFullLine: Boolean): Tuple2[String, String] = {
>     val splits = line.split(delimit)
>     if (splits.length <= 1) {
>       (null, null)
>     } else if (retFullLine) {
>       (splits(keyIndex), line)
>     } else{
>         (splits(keyIndex), splits(splits.length-keyIndex-1))
>     }
>   }
>
> Both of these files have 10 M records with same unique keys. Size of the
> file is nearly 280 MB and block size in hdfs is 256 MB. The output of join
> should contain 10 M records.
>
> We have done some more experiments -
> 1) Running cogroup instead of join - it also gives incorrect count.
> 2) Running union followed by groupbykey and then filtering records with
> two entries in sequence - It also gives incorrect count.
> 3) Increase spark.executor.memory to 50 g and everything works fine. Count
> comes 10 M for join,cogroup and union/groupbykey/filter transformations.
>
> I thought that 10g is enough memory for executors but even if the memory
> is less it should not result in incorrect computation. Probably there is a
> problem in reconstructing RDDs when memory is not enough.
>
> Thanks Chen for your observation. I get this problem on single worker so
> there will not be any mismatch of jars. On two workers, since executor
> memory gets doubled the code works fine.
>
> Regards,
> Ajay
>
>
>   On Thursday, June 5, 2014 1:35 AM, Matei Zaharia <
> matei.zaha...@gmail.com> wrote:
>
>
>  If this isn’t the problem, it would be great if you can post the code
> for the program.
>
> Matei
>
> On Jun 4, 2014, at 12:58 PM, Xu (Simon) Chen <xche...@gmail.com> wrote:
>
> Maybe your two workers have different assembly jar files?
> I just ran into a similar problem that my spark-shell is using a different
> jar file than my workers - got really confusing results.
> On Jun 4, 2014 8:33 AM, "Ajay Srivastava" <a_k_srivast...@yahoo.com>
> wrote:
>
> Hi,
>
> I am doing join of two RDDs which giving different results ( counting
> number of records ) each time I run this code on same input.
>
> The input files are large enough to be divided in two splits. When the
> program runs on two workers with single core assigned to these, output is
> consistent and looks correct. But when single worker is used with two or
> more than two cores, the result seems to be random. Every time, count of
> joined record is different.
>
> Does this sound like a defect or I need to take care of something while
> using join ? I am using spark-0.9.1.
>
> Regards
> Ajay
>
>
>
>
>
>

Reply via email to