Utkarsh Sharma created SPARK-26974:
--------------------------------------
Summary: Invalid data in grouped cached dataset, formed by joining
a large cached dataset with a small dataset
Key: SPARK-26974
URL: https://issues.apache.org/jira/browse/SPARK-26974
Project: Spark
Issue Type: Bug
Components: Java API, Spark Core, SQL
Affects Versions: 2.2.0
Reporter: Utkarsh Sharma
The initial datasets are derived from hive tables using the spark.table()
functions.
Dataset descriptions:
*+Sales+* dataset (close to 10 billion rows) with the following columns (and
sample rows) :
||ItemId (bigint)||CustomerId (bigint)||qty_sold (bigint)||
|1|1|20|
|1|2|30|
|2|1|40|
+*Customer*+ Dataset (close to 50000 rows) with the following columns (and
sample rows):
||CustomerId (bigint)||CustomerGrpNbr (smallint)||
|1|1|
|2|2|
|3|1|
I am doing the following steps:
# Caching sales dataset with close to 10 billion rows.
# Doing an inner join of 'sales' with 'customer' dataset
# Doing group by on the resultant dataset, based on CustomerGrpNbr column to
get sum(qty_sold) and stddev(qty_sold) vales in the customer groups.
# Caching the resultant grouped dataset.
# Doing a .count() on the grouped dataset.
The step 5 count is supposed to return only 20, because when you do a
customer.select("CustomerGroupNbr").distinct().count you get 20 values.
However, you get a value of around 65,000 in step 5.
Following are the commands I am running in spark-shell:
{code:java}
var sales = spark.table("sales_table")
var customer = spark.table(“customer_table”)
var finalDf = sales.join(customer,
"CustomerId").groupBy("CustomerGrpNbr").agg(sum("qty_sold"), stddev("qty_sold"))
sales.cache()
finalDf.cache()
finalDf.count() // returns around 65k rows and the count keeps on varying each
// run
customer.select("CustomerGrpNbr").distinct().count() //returns 20{code}
I have been able to replicate the same behavior using the java api as well.
This anamolous behavior disappears however, when I remove the caching
statements. I.e. if i run the following in spark-shell, it works as expected:
{code:java}
var sales = spark.table("sales_table")
var customer = spark.table(“customer_table”)
var finalDf = sales.join(customer,
"CustomerId").groupBy("CustomerGrpNbr").agg(sum("qty_sold"),
stddev("qty_sold"))
finalDf.count() // returns 20
customer.select("CustomerGrpNbr").distinct().count() //returns 20
{code}
The tables in hive from which the datasets are built do not change during this
entire process. So why does the caching cause this problem?
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]