[
https://issues.apache.org/jira/browse/SPARK-15678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15877735#comment-15877735
]
Gen TANG edited comment on SPARK-15678 at 2/24/17 10:44 AM:
------------------------------------------------------------
Hi, All
It seems that refreshByPath(_path_) should be called 2 times if there are n (n
> 1) cache operation on dataFrame from the _path_.
Therefore please reopen this issue
{code:title=not work code}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SparkSession
def f(path: String, spark: SparkSession): DataFrame = {
spark.catalog.refreshByPath(path)
val data = spark.read.option("mergeSchema", "true").parquet(path)
println(data.count) // always correct
val df = data.filter("id>10")
df.cache
println(df.count) // always correct
val df1 = df.filter("id>11")
df1.cache
println(df1.count)
df1
}
val dir = "/tmp/test"
spark.range(100).write.mode("overwrite").parquet(dir)
f(dir, spark).count // output 88 which is correct
spark.range(1000).write.mode("overwrite").parquet(dir)
f(dir, spark).count // output 88 which is incorrect
{code}
{code:title=work code}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SparkSession
def f(path: String, spark: SparkSession): DataFrame = {
spark.catalog.refreshByPath(path)
spark.catalog.refreshByPath(path)
val data = spark.read.option("mergeSchema", "true").parquet(path)
println(data.count)
val df = data.filter("id>10")
df.cache
println(df.count)
val df1 = df.filter("id>11")
df1.cache
println(df1.count)
df1
}
val dir = "/tmp/test"
spark.range(100).write.mode("overwrite").parquet(dir)
f(dir, spark).count // output 88 which is correct
spark.range(1000).write.mode("overwrite").parquet(dir)
f(dir, spark).count // output 988 which is incorrect
{code}
was (Author: gen):
Hi, All
It seems that refreshByPath(_path_) should be called 2 times if there are n (n
> 1) cache operation on dataFrame from the _path_. So please reopen this issue
{code:title=not work code}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SparkSession
def f(path: String, spark: SparkSession): DataFrame = {
spark.catalog.refreshByPath(path)
val data = spark.read.option("mergeSchema", "true").parquet(path)
println(data.count) // always correct
val df = data.filter("id>10")
df.cache
println(df.count) // always correct
val df1 = df.filter("id>11")
df1.cache
println(df1.count)
df1
}
val dir = "/tmp/test"
spark.range(100).write.mode("overwrite").parquet(dir)
f(dir, spark).count // output 88 which is correct
spark.range(1000).write.mode("overwrite").parquet(dir)
f(dir, spark).count // output 88 which is incorrect
{code}
{code:title=work code}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SparkSession
def f(path: String, spark: SparkSession): DataFrame = {
spark.catalog.refreshByPath(path)
spark.catalog.refreshByPath(path)
val data = spark.read.option("mergeSchema", "true").parquet(path)
println(data.count)
val df = data.filter("id>10")
df.cache
println(df.count)
val df1 = df.filter("id>11")
df1.cache
println(df1.count)
df1
}
val dir = "/tmp/test"
spark.range(100).write.mode("overwrite").parquet(dir)
f(dir, spark).count // output 88 which is correct
spark.range(1000).write.mode("overwrite").parquet(dir)
f(dir, spark).count // output 988 which is incorrect
{code}
> Not use cache on appends and overwrites
> ---------------------------------------
>
> Key: SPARK-15678
> URL: https://issues.apache.org/jira/browse/SPARK-15678
> Project: Spark
> Issue Type: Bug
> Affects Versions: 2.0.0
> Reporter: Sameer Agarwal
> Assignee: Sameer Agarwal
> Fix For: 2.0.0
>
>
> SparkSQL currently doesn't drop caches if the underlying data is overwritten.
> {code}
> val dir = "/tmp/test"
> sqlContext.range(1000).write.mode("overwrite").parquet(dir)
> val df = sqlContext.read.parquet(dir).cache()
> df.count() // outputs 1000
> sqlContext.range(10).write.mode("overwrite").parquet(dir)
> sqlContext.read.parquet(dir).count() // outputs 1000 instead of 10 <---- We
> are still using the cached dataset
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]