Hi Kazuaki Ishizaki
Thanks a lot for your help. It works. However, a more strange bug appears
as follows:
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SparkSession
def f(path: String, spark: SparkSession): DataFrame = {
val data = spark.read.option("mergeSchema", "true").parquet(path)
println(data.count)
val df = data.filter("id>10")
df.cache
println(df.count)
val df1 = df.filter("id>11")
df1.cache
println(df1.count)
df1
}
val dir = "/tmp/test"
spark.range(100).write.mode("overwrite").parquet(dir)
spark.catalog.refreshByPath(dir)
f(dir, spark).count // output 88 which is correct
spark.range(1000).write.mode("overwrite").parquet(dir)
spark.catalog.refreshByPath(dir)
f(dir, spark).count // output 88 which is incorrect
If we move refreshByPath into f(), just before spark.read. The whole code
works fine.
Any idea? Thanks a lot
Cheers
Gen
On Wed, Feb 22, 2017 at 2:22 PM, Kazuaki Ishizaki <[email protected]>
wrote:
> Hi,
> Thank you for pointing out the JIRA.
> I think that this JIRA suggests you to insert "spark.catalog.refreshByPath(
> dir)".
>
> val dir = "/tmp/test"
> spark.range(100).write.mode("overwrite").parquet(dir)
> val df = spark.read.parquet(dir)
> df.count // output 100 which is correct
> f(df).count // output 89 which is correct
>
> spark.range(1000).write.mode("overwrite").parquet(dir)
> spark.catalog.refreshByPath(dir) // insert a NEW statement
> val df1 = spark.read.parquet(dir)
> df1.count // output 1000 which is correct, in fact other operation expect
> df1.filter("id>10") return correct result.
> f(df1).count // output 89 which is incorrect
>
> Regards,
> Kazuaki Ishizaki
>
>
>
> From: gen tang <[email protected]>
> To: [email protected]
> Date: 2017/02/22 15:02
> Subject: Re: A DataFrame cache bug
> ------------------------------
>
>
>
> Hi All,
>
> I might find a related issue on jira:
>
> *https://issues.apache.org/jira/browse/SPARK-15678*
> <https://issues.apache.org/jira/browse/SPARK-15678>
>
> This issue is closed, may be we should reopen it.
>
> Thanks
>
> Cheers
> Gen
>
>
> On Wed, Feb 22, 2017 at 1:57 PM, gen tang <*[email protected]*
> <[email protected]>> wrote:
> Hi All,
>
> I found a strange bug which is related with reading data from a updated
> path and cache operation.
> Please consider the following code:
>
> import org.apache.spark.sql.DataFrame
>
> def f(data: DataFrame): DataFrame = {
> val df = data.filter("id>10")
> df.cache
> df.count
> df
> }
>
> f(spark.range(100).asInstanceOf[DataFrame]).count // output 89 which is
> correct
> f(spark.range(1000).asInstanceOf[DataFrame]).count // output 989 which is
> correct
>
> val dir = "/tmp/test"
> spark.range(100).write.mode("overwrite").parquet(dir)
> val df = spark.read.parquet(dir)
> df.count // output 100 which is correct
> f(df).count // output 89 which is correct
>
> spark.range(1000).write.mode("overwrite").parquet(dir)
> val df1 = spark.read.parquet(dir)
> df1.count // output 1000 which is correct, in fact other operation expect
> df1.filter("id>10") return correct result.
> f(df1).count // output 89 which is incorrect
>
> In fact when we use df1.filter("id>10"), spark will however use old cached
> dataFrame
>
> Any idea? Thanks a lot
>
> Cheers
> Gen
>
>
>