Hi,
Thank you for pointing out the JIRA.
I think that this JIRA suggests you to insert
"spark.catalog.refreshByPath(dir)".
val dir = "/tmp/test"
spark.range(100).write.mode("overwrite").parquet(dir)
val df = spark.read.parquet(dir)
df.count // output 100 which is correct
f(df).count // output 89 which is correct
spark.range(1000).write.mode("overwrite").parquet(dir)
spark.catalog.refreshByPath(dir) // insert a NEW statement
val df1 = spark.read.parquet(dir)
df1.count // output 1000 which is correct, in fact other operation expect
df1.filter("id>10") return correct result.
f(df1).count // output 89 which is incorrect
Regards,
Kazuaki Ishizaki
From: gen tang <[email protected]>
To: [email protected]
Date: 2017/02/22 15:02
Subject: Re: A DataFrame cache bug
Hi All,
I might find a related issue on jira:
https://issues.apache.org/jira/browse/SPARK-15678
This issue is closed, may be we should reopen it.
Thanks
Cheers
Gen
On Wed, Feb 22, 2017 at 1:57 PM, gen tang <[email protected]> wrote:
Hi All,
I found a strange bug which is related with reading data from a updated
path and cache operation.
Please consider the following code:
import org.apache.spark.sql.DataFrame
def f(data: DataFrame): DataFrame = {
val df = data.filter("id>10")
df.cache
df.count
df
}
f(spark.range(100).asInstanceOf[DataFrame]).count // output 89 which is
correct
f(spark.range(1000).asInstanceOf[DataFrame]).count // output 989 which is
correct
val dir = "/tmp/test"
spark.range(100).write.mode("overwrite").parquet(dir)
val df = spark.read.parquet(dir)
df.count // output 100 which is correct
f(df).count // output 89 which is correct
spark.range(1000).write.mode("overwrite").parquet(dir)
val df1 = spark.read.parquet(dir)
df1.count // output 1000 which is correct, in fact other operation expect
df1.filter("id>10") return correct result.
f(df1).count // output 89 which is incorrect
In fact when we use df1.filter("id>10"), spark will however use old cached
dataFrame
Any idea? Thanks a lot
Cheers
Gen