[ https://issues.apache.org/jira/browse/SPARK-22702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
chenfh5 updated SPARK-22702: ---------------------------- Description: I occur an issue about spark-sql. When obtaining a Dataset through some logic, I wish to persist this Dataset as it would be used many times in the future. However, when persisting it, the logic would be calculated twice. Therefore I make some local test to reproduce this issue, and it happens. I test in three filter function, and found that, .filter(col("id") > 10) //expect .filter(length(col("name")) > 4) //expect .filter(size(col("seq_name")) > 1) //unexpect if filter exist i.e., the twice calculation issue occurs when filter out result exists. h5. result image h6. expected !http://upload-images.jianshu.io/upload_images/2189341-51ee89377e7159ac.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240! h6. unexpected !http://upload-images.jianshu.io/upload_images/2189341-f87ee2047f0cf120.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240! h5. reproduce code {code:scala} import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.SparkConf import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ import org.apache.spark.sql.{Dataset, Row, SparkSession} object TwiceCalculationReproducer { private val separator = scala.reflect.io.File.separator private val dirName = new java.io.File("").getAbsolutePath + separator + "testOutput" System.setProperty("spark.app.name", "TestController") System.setProperty("spark.master", "local[2]") private val ss = SparkSession.builder().config(new SparkConf()).enableHiveSupport().getOrCreate() ss.sparkContext.hadoopConfiguration.set("fs.defaultFS", "file:///") private val sc = ss.sparkContext def main(args: Array[String]) { val fs = FileSystem.get(sc.hadoopConfiguration) fs.delete(new Path(dirName), true) Thread.sleep(1000) /*expected*/ val tableRaw = Dims.df val tableNewExp = seqColumnGeneratorExcepted(tableRaw) tableNewExp.persist() tableNewExp.show(10, 100) /*unexpected*/ Thread.sleep(1000) val tableNewUnexp = seqColumnGeneratorUnexpected(tableRaw) tableNewExp.persist() tableNewUnexp.show(10, 100) } /*normal*/ def seqColumnGeneratorExcepted[T](ds: Dataset[T]) = { ds.withColumn("seq_name", seqTokenUdf(col("id"), col("name"))) } /*abnormal*/ def seqColumnGeneratorUnexpected[T](ds: Dataset[T]) = { seqColumnGeneratorExcepted(ds) .filter(col("id") > 10) //expect .filter(length(col("name")) > 4) //expect .filter(size(col("seq_name")) > 1) //unexpect if filter exist } /*validator udf*/ def seqTokenUdf = udf { (id: Int, name: String) => { /*validator 1: console print*/ println(name + "_" + id + "_" + System.currentTimeMillis()) /*validator 2: write file in case of executor not printing console*/ val fs = FileSystem.get(sc.hadoopConfiguration) fs.setWriteChecksum(false) val fileName = Seq(dirName, name + "_" + System.currentTimeMillis.toString) mkString separator fs.create(new Path(fileName)) /*return*/ Seq[String](name, System.currentTimeMillis().toString) } } /*test data mock*/ object Dims { private val structTypes = StructType(Seq( StructField("id", IntegerType), StructField("name", StringType) )) private val data = List( Row(100, "file100"), Row(101, "file101"), Row(102, "file102") ) private val rdd = sc.parallelize(data) val df = ss.createDataFrame(rdd, structTypes) } } {code} Is there any problem in the size() function? Kind regards, chenfh5 was: I occur an issue about spark-sql. When obtaining a Dataset through some logic, I wish to persist this Dataset as it would be used many times in the future. However, when persisting it, the logic would be calculated twice. Therefore I make some local test to reproduce this issue, and it happens. I test in three filter function, and found that, .filter(col("id") > 10) //expect .filter(length(col("name")) > 4) //expect .filter(size(col("seq_name")) > 1) //unexpect if filter exist i.e., the twice calculation issue occurs when filter out result exists. h5. result image h6. expected !http://upload-images.jianshu.io/upload_images/2189341-51ee89377e7159ac.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240! h6. unexpected !http://upload-images.jianshu.io/upload_images/2189341-f87ee2047f0cf120.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240! h5. reproduce code {code:scala} import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.SparkConf import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ import org.apache.spark.sql.{Dataset, Row, SparkSession} object TwiceCalculationReproducer { private val separator = scala.reflect.io.File.separator private val dirName = new java.io.File("").getAbsolutePath + separator + "testOutput" System.setProperty("spark.app.name", "TestController") System.setProperty("spark.master", "local[2]") private val ss = SparkSession.builder().config(new SparkConf()).enableHiveSupport().getOrCreate() ss.sparkContext.hadoopConfiguration.set("fs.defaultFS", "file:///") private val sc = ss.sparkContext def main(args: Array[String]) { val fs = FileSystem.get(sc.hadoopConfiguration) fs.delete(new Path(dirName), true) Thread.sleep(1000) /*expected*/ val tableRaw = Dims.df val tableNewExp = seqColumnGeneratorExcepted(tableRaw) tableNewExp.persist() tableNewExp.show(10, 100) /*unexpected*/ Thread.sleep(1000) val tableNewUnexp = seqColumnGeneratorUnexpected(tableRaw) tableNewExp.persist() tableNewUnexp.show(10, 100) } /*normal*/ def seqColumnGeneratorExcepted[T](ds: Dataset[T]) = { ds.withColumn("seq_name", seqTokenUdf(col("id"), col("name"))) } /*abnormal*/ def seqColumnGeneratorUnexpected[T](ds: Dataset[T]) = { seqColumnGeneratorExcepted(ds) .filter(col("id") > 10) //expect .filter(length(col("name")) > 4) //expect .filter(size(col("seq_name")) > 1) //unexpect if filter exist } /*validator udf*/ def seqTokenUdf = udf { (id: Int, name: String) => { /*validator 1: console print*/ println(name + "_" + id + "_" + System.currentTimeMillis()) /*validator 2: write file in case of executor not printing console*/ val fs = FileSystem.get(sc.hadoopConfiguration) fs.setWriteChecksum(false) val fileName = Seq(dirName, name + "_" + System.currentTimeMillis.toString) mkString separator fs.create(new Path(fileName)) /*return*/ Seq[String](name, System.currentTimeMillis().toString) } } /*test data mock*/ object Dims { private val structTypes = StructType(Seq( StructField("id", IntegerType), StructField("name", StringType) )) private val data = List( Row(100, "file100"), Row(101, "file101"), Row(102, "file102") ) private val rdd = sc.parallelize(data) val df = ss.createDataFrame(rdd, structTypes) } } {code} Kind regards, chenfh5 > Spark sql filter with size function(if exists) leads twice calculation > ---------------------------------------------------------------------- > > Key: SPARK-22702 > URL: https://issues.apache.org/jira/browse/SPARK-22702 > Project: Spark > Issue Type: Question > Components: SQL > Affects Versions: 2.1.0 > Environment: jdk1.7.0_67 > spark-hive_2.11 > Reporter: chenfh5 > Priority: Minor > > I occur an issue about spark-sql. When obtaining a Dataset through some > logic, I wish to persist this Dataset as it would be used many times in the > future. However, when persisting it, the logic would be calculated twice. > Therefore I make some local test to reproduce this issue, and it happens. > I test in three filter function, and found that, > .filter(col("id") > 10) //expect > .filter(length(col("name")) > 4) //expect > .filter(size(col("seq_name")) > 1) //unexpect if filter exist > i.e., the twice calculation issue occurs when filter out result exists. > h5. result image > h6. expected > !http://upload-images.jianshu.io/upload_images/2189341-51ee89377e7159ac.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240! > h6. unexpected > !http://upload-images.jianshu.io/upload_images/2189341-f87ee2047f0cf120.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240! > h5. reproduce code > {code:scala} > import org.apache.hadoop.fs.{FileSystem, Path} > import org.apache.spark.SparkConf > import org.apache.spark.sql.functions._ > import org.apache.spark.sql.types._ > import org.apache.spark.sql.{Dataset, Row, SparkSession} > object TwiceCalculationReproducer { > private val separator = scala.reflect.io.File.separator > private val dirName = new java.io.File("").getAbsolutePath + separator + > "testOutput" > System.setProperty("spark.app.name", "TestController") > System.setProperty("spark.master", "local[2]") > private val ss = SparkSession.builder().config(new > SparkConf()).enableHiveSupport().getOrCreate() > ss.sparkContext.hadoopConfiguration.set("fs.defaultFS", "file:///") > private val sc = ss.sparkContext > def main(args: Array[String]) { > val fs = FileSystem.get(sc.hadoopConfiguration) > fs.delete(new Path(dirName), true) > Thread.sleep(1000) > /*expected*/ > val tableRaw = Dims.df > val tableNewExp = seqColumnGeneratorExcepted(tableRaw) > tableNewExp.persist() > tableNewExp.show(10, 100) > /*unexpected*/ > Thread.sleep(1000) > val tableNewUnexp = seqColumnGeneratorUnexpected(tableRaw) > tableNewExp.persist() > tableNewUnexp.show(10, 100) > } > /*normal*/ > def seqColumnGeneratorExcepted[T](ds: Dataset[T]) = { > ds.withColumn("seq_name", seqTokenUdf(col("id"), col("name"))) > } > /*abnormal*/ > def seqColumnGeneratorUnexpected[T](ds: Dataset[T]) = { > seqColumnGeneratorExcepted(ds) > .filter(col("id") > 10) //expect > .filter(length(col("name")) > 4) //expect > .filter(size(col("seq_name")) > 1) //unexpect if filter exist > } > /*validator udf*/ > def seqTokenUdf = udf { > (id: Int, name: String) => { > /*validator 1: console print*/ > println(name + "_" + id + "_" + System.currentTimeMillis()) > /*validator 2: write file in case of executor not printing console*/ > val fs = FileSystem.get(sc.hadoopConfiguration) > fs.setWriteChecksum(false) > val fileName = Seq(dirName, name + "_" + > System.currentTimeMillis.toString) mkString separator > fs.create(new Path(fileName)) > /*return*/ > Seq[String](name, System.currentTimeMillis().toString) > } > } > /*test data mock*/ > object Dims { > private val structTypes = StructType(Seq( > StructField("id", IntegerType), > StructField("name", StringType) > )) > private val data = List( > Row(100, "file100"), > Row(101, "file101"), > Row(102, "file102") > ) > private val rdd = sc.parallelize(data) > val df = ss.createDataFrame(rdd, structTypes) > } > } > {code} > Is there any problem in the size() function? > Kind regards, > chenfh5 -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org