[ 
https://issues.apache.org/jira/browse/SPARK-22702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen resolved SPARK-22702.
-------------------------------
    Resolution: Invalid

This should start on the mailing list. I must say I'm not clear what you're 
reporting. What's being printed? why do you not expect this output? is there a 
simpler reproduction?

> Spark sql filter with size function(if exists) leads twice calculation
> ----------------------------------------------------------------------
>
>                 Key: SPARK-22702
>                 URL: https://issues.apache.org/jira/browse/SPARK-22702
>             Project: Spark
>          Issue Type: Question
>          Components: SQL
>    Affects Versions: 2.1.0
>         Environment: jdk1.7.0_67
> spark-hive_2.11
>            Reporter: chenfh5
>            Priority: Minor
>
> I occur an issue about spark-sql. When obtaining a Dataset through some 
> logics, I wish to persist this Dataset as it would be used many times in the 
> future. However, when persisting it, the logic would be calculated twice. 
> Therefore I make some local test to reproduce this issue, and it happens.
> I test in three filter function, and found that, 
> .filter(col("id") > 10) //expect
> .filter(length(col("name")) > 4) //expect
> .filter(size(col("seq_name")) > 1) //unexpect if filter exist
> i.e., the twice calculation issue occurs when filter out result exists.
> h5. result image
> h6. expected
> !http://upload-images.jianshu.io/upload_images/2189341-51ee89377e7159ac.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240!
> h6. unexpected
> !http://upload-images.jianshu.io/upload_images/2189341-f87ee2047f0cf120.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240!
> h5. reproduce code
> {code:scala}
> import org.apache.hadoop.fs.{FileSystem, Path}
> import org.apache.spark.SparkConf
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.{Dataset, Row, SparkSession}
> object TwiceCalculationReproducer {
>   private val separator = scala.reflect.io.File.separator
>   private val dirName = new java.io.File("").getAbsolutePath + separator + 
> "testOutput"
>   System.setProperty("spark.app.name", "TestController")
>   System.setProperty("spark.master", "local[2]")
>   private val ss = SparkSession.builder().config(new 
> SparkConf()).enableHiveSupport().getOrCreate()
>   ss.sparkContext.hadoopConfiguration.set("fs.defaultFS", "file:///")
>   private val sc = ss.sparkContext
>   def main(args: Array[String]) {
>     val fs = FileSystem.get(sc.hadoopConfiguration)
>     fs.delete(new Path(dirName), true)
>     Thread.sleep(1000)
>     /*expected*/
>     val tableRaw = Dims.df
>     val tableNewExp = seqColumnGeneratorExcepted(tableRaw)
>     tableNewExp.persist()
>     tableNewExp.show(10, 100)
>     /*unexpected*/
>     Thread.sleep(1000)
>     val tableNewUnexp = seqColumnGeneratorUnexpected(tableRaw)
>     tableNewExp.persist()
>     tableNewUnexp.show(10, 100)
>   }
>   /*normal*/
>   def seqColumnGeneratorExcepted[T](ds: Dataset[T]) = {
>     ds.withColumn("seq_name", seqTokenUdf(col("id"), col("name")))
>   }
>   /*abnormal*/
>   def seqColumnGeneratorUnexpected[T](ds: Dataset[T]) = {
>     seqColumnGeneratorExcepted(ds)
>         .filter(col("id") > 10) //expect
>         .filter(length(col("name")) > 4) //expect
>         .filter(size(col("seq_name")) > 1) //unexpect if filter exist
>   }
>   /*validator udf*/
>   def seqTokenUdf = udf {
>     (id: Int, name: String) => {
>       /*validator 1: console print*/
>       println(name + "_" + id + "_" + System.currentTimeMillis())
>       /*validator 2: write file in case of executor not printing console*/
>       val fs = FileSystem.get(sc.hadoopConfiguration)
>       fs.setWriteChecksum(false)
>       val fileName = Seq(dirName, name + "_" + 
> System.currentTimeMillis.toString) mkString separator
>       fs.create(new Path(fileName))
>       /*return*/
>       Seq[String](name, System.currentTimeMillis().toString)
>     }
>   }
>   /*test data mock*/
>   object Dims {
>     private val structTypes = StructType(Seq(
>       StructField("id", IntegerType),
>       StructField("name", StringType)
>     ))
>     private val data = List(
>       Row(100, "file100"),
>       Row(101, "file101"),
>       Row(102, "file102")
>     )
>     private val rdd = sc.parallelize(data)
>     val df = ss.createDataFrame(rdd, structTypes)
>   }
> }
> {code}
> Is there any problem in the size() function?
> Kind regards,
> chenfh5



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to