Sreelal S L commented on SPARK-17621:


Our actual code is bit different from what i have given. We use streaming,and 
use tranform() to reuse couple of the DataFrame operations from other part of 
our code base. Dont have much control to change code there. (Worst case have to 
do changes there) . But feel something is wrong here. 

I hit the issue there, and was trying out samples to figure out exactly where 
the issue is coming from . 
Looks like its little unexpected behaviour, since if it works for groupBy(), 
the behaviour should be same for orderBy(). 

Also the map() which increments the accumulator is invoked only once. So it has 
something to do with stage result calculated twice. 

I can understand the accumulator map() calling twice if some task failure 
happens. But thats not the case here. All tasks are successful and the map() 
doing accumulator addition is called only once.  

> Accumulator value is doubled when using DataFrame.orderBy()
> -----------------------------------------------------------
>                 Key: SPARK-17621
>                 URL: https://issues.apache.org/jira/browse/SPARK-17621
>             Project: Spark
>          Issue Type: Bug
>          Components: Scheduler, SQL
>    Affects Versions: 2.0.0
>         Environment: Development environment. (Eclipse . Single process) 
>            Reporter: Sreelal S L
>            Priority: Minor
> We are tracing the records read by our source using an accumulator.  We do a 
> orderBy on the Dataframe before the output operation. When the job is 
> completed, the accumulator values is becoming double of the expected value . 
> . 
> Below is the sample code i ran . 
> {code} 
>  val sqlContext = SparkSession.builder() 
>       .config("spark.sql.retainGroupColumns", 
> false).config("spark.sql.warehouse.dir", "file:///C:/Test").master("local[*]")
>       .getOrCreate()
>     val sc = sqlContext.sparkContext
>     val accumulator1 = sc.accumulator(0, "accumulator1")
>     val usersDF = sqlContext.read.json("C:\\users.json") //  single row 
> {"name":"sreelal" ,"country":"IND"}
>     val usersDFwithCount = usersDF.rdd.map(x => { accumulator1 += 1; x });
>     val counterDF = sqlContext.createDataFrame(usersDFwithCount, 
> usersDF.schema);
>     val oderedDF = counterDF.orderBy("name")
>     val collected = oderedDF.collect()
>     collected.foreach { x => println(x) }
>     println("accumulator1 : " + accumulator1.value)
>     println("Done");
> {code}
> I have only one row in the users.json file.  I expect accumulator1 to have 
> value 1. But its coming as 2. 
> In the Spark Sql UI , i see two jobs getting generated for the same. 

This message was sent by Atlassian JIRA

To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to