[ 
https://issues.apache.org/jira/browse/SPARK-11330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Saif Addin Ellafi updated SPARK-11330:
--------------------------------------
    Description: 
ONLY HAPPENS WHEN PERSIST() IS CALLED

val data = sqlContext.read.parquet("/var/data/Saif/ppnr_pqt")
data.groupBy("vintages").count.select("vintages").filter("vintages = 
'2007-01-01'").first
>>> res9: org.apache.spark.sql.Row = [2007-01-01]

data.groupBy("vintages").count.persist.select("vintages").filter("vintages = 
'2007-01-01'").first
>>> Exception on empty iterator stuff

This does not happen if using another type of field, eg IntType
data.groupBy("yyyymm").count.persist.select("yyyymm").filter("yyyymm = 
200805").first >>> res13: org.apache.spark.sql.Row = [200805]

NOTE: I have no idea whether I used KRYO serializer when stored this parquet.
NOTE2: I have found a workaround by putting the persist after the filtering.

Query to raw-data works fine:

data.select("vintages").filter("vintages = '2007-01-01'").first >>> res4: 
org.apache.spark.sql.Row = [2007-01-01]

Originally, the issue happened with a larger aggregation operation, the result 
was that data was inconsistent bringing different results every call.

Reducing the operation step by step, I got into this issue.
In any case, the original operation was:

val data = sqlContext.read.parquet("/var/Saif/data_pqt")

val res = data.groupBy("product", "band", "age", "vint", "mb", 
"yyyymm").agg(count($"account_id").as("N"), sum($"balance").as("balance_eom"), 
sum($"balancec").as("balance_eoc"), sum($"spend").as("spend"), 
sum($"payment").as("payment"), sum($"feoc").as("feoc"), 
sum($"cfintbal").as("cfintbal"), count($"newacct" === 
1).as("newacct")).persist()

val z = res.select("vint", "yyyymm").filter("vint = 
'2007-01-01'").select("yyyymm").distinct.collect

z.length

>>> res0: Int = 102

res.unpersist()

val z = res.select("vint", "yyyymm").filter("vint = 
'2007-01-01'").select("yyyymm").distinct.collect

z.length

>>> res1: Int = 103


  was:
ONLY HAPPENS WHEN PERSIST() IS CALLED

val data = sqlContext.read.parquet("/var/data/Saif/ppnr_pqt")
data.groupBy("vintages").count.select("vintages").filter("vintages = 
'2007-01-01'").first
>>> res9: org.apache.spark.sql.Row = [2007-01-01]

data.groupBy("vintages").count.persist.select("vintages").filter("vintages = 
'2007-01-01'").first
>>> Exception on empty iterator stuff

This does not happen if using another type of field, eg IntType
data.groupBy("yyyymm").count.persist.select("yyyymm").filter("yyyymm = 
200805").first >>> res13: org.apache.spark.sql.Row = [200805]

NOTE: I have no idea whether I used KRYO serializer when stored this parquet.

Query to raw-data works fine:

data.select("vintages").filter("vintages = '2007-01-01'").first >>> res4: 
org.apache.spark.sql.Row = [2007-01-01]

Originally, the issue happened with a larger aggregation operation, the result 
was that data was inconsistent bringing different results every call.

Reducing the operation step by step, I got into this issue.
In any case, the original operation was:

val data = sqlContext.read.parquet("/var/Saif/data_pqt")

val res = data.groupBy("product", "band", "age", "vint", "mb", 
"yyyymm").agg(count($"account_id").as("N"), sum($"balance").as("balance_eom"), 
sum($"balancec").as("balance_eoc"), sum($"spend").as("spend"), 
sum($"payment").as("payment"), sum($"feoc").as("feoc"), 
sum($"cfintbal").as("cfintbal"), count($"newacct" === 
1).as("newacct")).persist()

val z = res.select("vint", "yyyymm").filter("vint = 
'2007-01-01'").select("yyyymm").distinct.collect

z.length

>>> res0: Int = 102

res.unpersist()

val z = res.select("vint", "yyyymm").filter("vint = 
'2007-01-01'").select("yyyymm").distinct.collect

z.length

>>> res1: Int = 103



> Filter operation on StringType after groupBy PERSISTED brings no results when 
> there are
> ---------------------------------------------------------------------------------------
>
>                 Key: SPARK-11330
>                 URL: https://issues.apache.org/jira/browse/SPARK-11330
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.5.1
>         Environment: Stand alone Cluster of five servers (happens as well in 
> local mode). sqlContext instance of HiveContext (happens as well with 
> SQLContext)
> No special options other than driver memory and executor memory.
> Parquet partitions are 512 where there are 160 cores. Happens as well with 
> other partitioning
> Data is nearly 2 billion rows.
>            Reporter: Saif Addin Ellafi
>            Priority: Blocker
>
> ONLY HAPPENS WHEN PERSIST() IS CALLED
> val data = sqlContext.read.parquet("/var/data/Saif/ppnr_pqt")
> data.groupBy("vintages").count.select("vintages").filter("vintages = 
> '2007-01-01'").first
> >>> res9: org.apache.spark.sql.Row = [2007-01-01]
> data.groupBy("vintages").count.persist.select("vintages").filter("vintages = 
> '2007-01-01'").first
> >>> Exception on empty iterator stuff
> This does not happen if using another type of field, eg IntType
> data.groupBy("yyyymm").count.persist.select("yyyymm").filter("yyyymm = 
> 200805").first >>> res13: org.apache.spark.sql.Row = [200805]
> NOTE: I have no idea whether I used KRYO serializer when stored this parquet.
> NOTE2: I have found a workaround by putting the persist after the filtering.
> Query to raw-data works fine:
> data.select("vintages").filter("vintages = '2007-01-01'").first >>> res4: 
> org.apache.spark.sql.Row = [2007-01-01]
> Originally, the issue happened with a larger aggregation operation, the 
> result was that data was inconsistent bringing different results every call.
> Reducing the operation step by step, I got into this issue.
> In any case, the original operation was:
> val data = sqlContext.read.parquet("/var/Saif/data_pqt")
> val res = data.groupBy("product", "band", "age", "vint", "mb", 
> "yyyymm").agg(count($"account_id").as("N"), 
> sum($"balance").as("balance_eom"), sum($"balancec").as("balance_eoc"), 
> sum($"spend").as("spend"), sum($"payment").as("payment"), 
> sum($"feoc").as("feoc"), sum($"cfintbal").as("cfintbal"), count($"newacct" 
> === 1).as("newacct")).persist()
> val z = res.select("vint", "yyyymm").filter("vint = 
> '2007-01-01'").select("yyyymm").distinct.collect
> z.length
> >>> res0: Int = 102
> res.unpersist()
> val z = res.select("vint", "yyyymm").filter("vint = 
> '2007-01-01'").select("yyyymm").distinct.collect
> z.length
> >>> res1: Int = 103



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to