Hi Kazuaki Ishizaki Thanks a lot for your help. It works. However, a more strange bug appears as follows:
import org.apache.spark.sql.DataFrame import org.apache.spark.sql.SparkSession def f(path: String, spark: SparkSession): DataFrame = { val data = spark.read.option("mergeSchema", "true").parquet(path) println(data.count) val df = data.filter("id>10") df.cache println(df.count) val df1 = df.filter("id>11") df1.cache println(df1.count) df1 } val dir = "/tmp/test" spark.range(100).write.mode("overwrite").parquet(dir) spark.catalog.refreshByPath(dir) f(dir, spark).count // output 88 which is correct spark.range(1000).write.mode("overwrite").parquet(dir) spark.catalog.refreshByPath(dir) f(dir, spark).count // output 88 which is incorrect If we move refreshByPath into f(), just before spark.read. The whole code works fine. Any idea? Thanks a lot Cheers Gen On Wed, Feb 22, 2017 at 2:22 PM, Kazuaki Ishizaki <ishiz...@jp.ibm.com> wrote: > Hi, > Thank you for pointing out the JIRA. > I think that this JIRA suggests you to insert "spark.catalog.refreshByPath( > dir)". > > val dir = "/tmp/test" > spark.range(100).write.mode("overwrite").parquet(dir) > val df = spark.read.parquet(dir) > df.count // output 100 which is correct > f(df).count // output 89 which is correct > > spark.range(1000).write.mode("overwrite").parquet(dir) > spark.catalog.refreshByPath(dir) // insert a NEW statement > val df1 = spark.read.parquet(dir) > df1.count // output 1000 which is correct, in fact other operation expect > df1.filter("id>10") return correct result. > f(df1).count // output 89 which is incorrect > > Regards, > Kazuaki Ishizaki > > > > From: gen tang <gen.tan...@gmail.com> > To: dev@spark.apache.org > Date: 2017/02/22 15:02 > Subject: Re: A DataFrame cache bug > ------------------------------ > > > > Hi All, > > I might find a related issue on jira: > > *https://issues.apache.org/jira/browse/SPARK-15678* > <https://issues.apache.org/jira/browse/SPARK-15678> > > This issue is closed, may be we should reopen it. > > Thanks > > Cheers > Gen > > > On Wed, Feb 22, 2017 at 1:57 PM, gen tang <*gen.tan...@gmail.com* > <gen.tan...@gmail.com>> wrote: > Hi All, > > I found a strange bug which is related with reading data from a updated > path and cache operation. > Please consider the following code: > > import org.apache.spark.sql.DataFrame > > def f(data: DataFrame): DataFrame = { > val df = data.filter("id>10") > df.cache > df.count > df > } > > f(spark.range(100).asInstanceOf[DataFrame]).count // output 89 which is > correct > f(spark.range(1000).asInstanceOf[DataFrame]).count // output 989 which is > correct > > val dir = "/tmp/test" > spark.range(100).write.mode("overwrite").parquet(dir) > val df = spark.read.parquet(dir) > df.count // output 100 which is correct > f(df).count // output 89 which is correct > > spark.range(1000).write.mode("overwrite").parquet(dir) > val df1 = spark.read.parquet(dir) > df1.count // output 1000 which is correct, in fact other operation expect > df1.filter("id>10") return correct result. > f(df1).count // output 89 which is incorrect > > In fact when we use df1.filter("id>10"), spark will however use old cached > dataFrame > > Any idea? Thanks a lot > > Cheers > Gen > > >