Hi, The example that I provided is not very clear. And I add a more clear example in jira.
Thanks Cheers Gen On Wed, Feb 22, 2017 at 3:47 PM, gen tang <gen.tan...@gmail.com> wrote: > Hi Kazuaki Ishizaki > > Thanks a lot for your help. It works. However, a more strange bug appears > as follows: > > import org.apache.spark.sql.DataFrame > import org.apache.spark.sql.SparkSession > > def f(path: String, spark: SparkSession): DataFrame = { > val data = spark.read.option("mergeSchema", "true").parquet(path) > println(data.count) > val df = data.filter("id>10") > df.cache > println(df.count) > val df1 = df.filter("id>11") > df1.cache > println(df1.count) > df1 > } > > val dir = "/tmp/test" > spark.range(100).write.mode("overwrite").parquet(dir) > spark.catalog.refreshByPath(dir) > f(dir, spark).count // output 88 which is correct > > spark.range(1000).write.mode("overwrite").parquet(dir) > spark.catalog.refreshByPath(dir) > f(dir, spark).count // output 88 which is incorrect > > If we move refreshByPath into f(), just before spark.read. The whole code > works fine. > > Any idea? Thanks a lot > > Cheers > Gen > > > On Wed, Feb 22, 2017 at 2:22 PM, Kazuaki Ishizaki <ishiz...@jp.ibm.com> > wrote: > >> Hi, >> Thank you for pointing out the JIRA. >> I think that this JIRA suggests you to insert >> "spark.catalog.refreshByPath(dir)". >> >> val dir = "/tmp/test" >> spark.range(100).write.mode("overwrite").parquet(dir) >> val df = spark.read.parquet(dir) >> df.count // output 100 which is correct >> f(df).count // output 89 which is correct >> >> spark.range(1000).write.mode("overwrite").parquet(dir) >> spark.catalog.refreshByPath(dir) // insert a NEW statement >> val df1 = spark.read.parquet(dir) >> df1.count // output 1000 which is correct, in fact other operation expect >> df1.filter("id>10") return correct result. >> f(df1).count // output 89 which is incorrect >> >> Regards, >> Kazuaki Ishizaki >> >> >> >> From: gen tang <gen.tan...@gmail.com> >> To: dev@spark.apache.org >> Date: 2017/02/22 15:02 >> Subject: Re: A DataFrame cache bug >> ------------------------------ >> >> >> >> Hi All, >> >> I might find a related issue on jira: >> >> *https://issues.apache.org/jira/browse/SPARK-15678* >> <https://issues.apache.org/jira/browse/SPARK-15678> >> >> This issue is closed, may be we should reopen it. >> >> Thanks >> >> Cheers >> Gen >> >> >> On Wed, Feb 22, 2017 at 1:57 PM, gen tang <*gen.tan...@gmail.com* >> <gen.tan...@gmail.com>> wrote: >> Hi All, >> >> I found a strange bug which is related with reading data from a updated >> path and cache operation. >> Please consider the following code: >> >> import org.apache.spark.sql.DataFrame >> >> def f(data: DataFrame): DataFrame = { >> val df = data.filter("id>10") >> df.cache >> df.count >> df >> } >> >> f(spark.range(100).asInstanceOf[DataFrame]).count // output 89 which is >> correct >> f(spark.range(1000).asInstanceOf[DataFrame]).count // output 989 which >> is correct >> >> val dir = "/tmp/test" >> spark.range(100).write.mode("overwrite").parquet(dir) >> val df = spark.read.parquet(dir) >> df.count // output 100 which is correct >> f(df).count // output 89 which is correct >> >> spark.range(1000).write.mode("overwrite").parquet(dir) >> val df1 = spark.read.parquet(dir) >> df1.count // output 1000 which is correct, in fact other operation expect >> df1.filter("id>10") return correct result. >> f(df1).count // output 89 which is incorrect >> >> In fact when we use df1.filter("id>10"), spark will however use old >> cached dataFrame >> >> Any idea? Thanks a lot >> >> Cheers >> Gen >> >> >> >