[ 
https://issues.apache.org/jira/browse/SPARK-11330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14978528#comment-14978528
 ] 

Saif Addin Ellafi edited comment on SPARK-11330 at 10/28/15 10:40 PM:
----------------------------------------------------------------------

Hello Cheng Hao, and thank you very much for trying to reproduce. 

You are right, this does not happen on very small data. I started reproducing 
it with a slightly more complex +100 rows of data. This is the smallest I can 
do, I am not sure I understand the issue completely.

UPDATE: This seems to be related to partitioning/data localty. On a 100 rows 
database, I could only reproduce the issue when using a cluster mode shell. Not 
happening in local mode.

To ensure you can reproduce the issue, please use the json dataframe attached 
(50k rows), so you can see the issue even in local[1] mode.

Please use attached dataframe in JSON in spark-shell local mode no options.

val df = sqlContext.read.json("/var/data/Saif/bug_reproduce_50k")

df.groupBy("mdl_loan_state", "filemonth_dt").count.filter("mdl_loan_state = 
'PPD00 '").count

df.groupBy("mdl_loan_state", 
"filemonth_dt").count.persist().filter("mdl_loan_state = 'PPD00 '").count

>>>

The first count returns 129, while the persisted one, returns ~16.
With bigger data, the difference increases exponentially and can randomly 
change.

Thank you!


was (Author: saif.a.ellafi):
Hello Cheng Hao, and thank you very much for trying to reproduce. 

You are right, this does not happen on very small data. I started reproducing 
it with a slightly more complex +100 rows of data. This is the smallest I can 
do, I am not sure I understand the issue completely.

UPDATE: This seems to be related to partitioning/data localty. On a 100 rows 
database, I could only reproduce the issue when using a cluster mode shell. Not 
happening in local mode.

To ensure you can reproduce the issue, please use the json dataframe attached 
(50k rows), so you can see the issue even in local[1] mode.

Please use attached dataframe in JSON in spark-shell local mode no options.

val df = sqlContext.read.json("/var/data/Saif/bug_reproduce")

df.groupBy("mdl_loan_state", "filemonth_dt").count.filter("mdl_loan_state = 
'PPD00 '").count

df.groupBy("mdl_loan_state", 
"filemonth_dt").count.persist().filter("mdl_loan_state = 'PPD00 '").count

>>>

The first count returns 129, while the persisted one, returns ~16.
With bigger data, the difference increases exponentially and can randomly 
change.

Thank you!

> Filter operation on StringType after groupBy PERSISTED brings no results
> ------------------------------------------------------------------------
>
>                 Key: SPARK-11330
>                 URL: https://issues.apache.org/jira/browse/SPARK-11330
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.5.1
>         Environment: Stand alone Cluster of five servers (happens as well in 
> local mode). sqlContext instance of HiveContext (happens as well with 
> SQLContext)
> No special options other than driver memory and executor memory.
> Parquet partitions are 512 where there are 160 cores. Happens as well with 
> other partitioning
> Data is nearly 2 billion rows.
>            Reporter: Saif Addin Ellafi
>            Priority: Blocker
>         Attachments: bug_reproduce.zip, bug_reproduce_50k.zip
>
>
> ONLY HAPPENS WHEN PERSIST() IS CALLED
> val data = sqlContext.read.parquet("/var/data/Saif/ppnr_pqt")
> data.groupBy("vintages").count.select("vintages").filter("vintages = 
> '2007-01-01'").first
> >>> res9: org.apache.spark.sql.Row = [2007-01-01]
> data.groupBy("vintages").count.persist.select("vintages").filter("vintages = 
> '2007-01-01'").first
> >>> Exception on empty iterator stuff
> This does not happen if using another type of field, eg IntType
> data.groupBy("yyyymm").count.persist.select("yyyymm").filter("yyyymm = 
> 200805").first >>> res13: org.apache.spark.sql.Row = [200805]
> NOTE: I have no idea whether I used KRYO serializer when stored this parquet.
> NOTE2: If setting the persist after the filtering, it works fine. But this is 
> not a good enough workaround since any filter operation afterwards will break 
> results.
> NOTE3: I have reproduced the issue with several different datasets.
> NOTE4: The only real workaround is to store the groupBy dataframe in database 
> and reload it as a new dataframe.
> Query to raw-data works fine:
> data.select("vintages").filter("vintages = '2007-01-01'").first >>> res4: 
> org.apache.spark.sql.Row = [2007-01-01]
> Originally, the issue happened with a larger aggregation operation, the 
> result was that data was inconsistent bringing different results every call.
> Reducing the operation step by step, I got into this issue.
> In any case, the original operation was:
> val data = sqlContext.read.parquet("/var/Saif/data_pqt")
> val res = data.groupBy("product", "band", "age", "vint", "mb", 
> "yyyymm").agg(count($"account_id").as("N"), 
> sum($"balance").as("balance_eom"), sum($"balancec").as("balance_eoc"), 
> sum($"spend").as("spend"), sum($"payment").as("payment"), 
> sum($"feoc").as("feoc"), sum($"cfintbal").as("cfintbal"), count($"newacct" 
> === 1).as("newacct")).persist()
> val z = res.select("vint", "yyyymm").filter("vint = 
> '2007-01-01'").select("yyyymm").distinct.collect
> z.length
> >>> res0: Int = 102
> res.unpersist()
> val z = res.select("vint", "yyyymm").filter("vint = 
> '2007-01-01'").select("yyyymm").distinct.collect
> z.length
> >>> res1: Int = 103



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to