[ 
https://issues.apache.org/jira/browse/SPARK-22702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

chenfh5 updated SPARK-22702:
----------------------------
    Description: 
I occur an issue about spark-sql. When obtaining a Dataset through some logic, 
I wish to persist this Dataset as it would be used many times in the future. 
However, when persisting it, the logic would be calculated twice. Therefore I 
make some local test to reproduce this issue, and it happens.

I test in three filter function, and found that, 
.filter(col("id") > 10) //expect
.filter(length(col("name")) > 4) //expect
.filter(size(col("seq_name")) > 1) //unexpect if filter exist

i.e., the twice calculation issue occurs when filter out result exists.

h5. result image
h6. expected
!http://upload-images.jianshu.io/upload_images/2189341-51ee89377e7159ac.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240!

h6. unexpected
!http://upload-images.jianshu.io/upload_images/2189341-f87ee2047f0cf120.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240!

h5. reproduce code
{code:scala}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Dataset, Row, SparkSession}


object TwiceCalculationReproducer {
  private val separator = scala.reflect.io.File.separator
  private val dirName = new java.io.File("").getAbsolutePath + separator + 
"testOutput"

  System.setProperty("spark.app.name", "TestController")
  System.setProperty("spark.master", "local[2]")
  private val ss = SparkSession.builder().config(new 
SparkConf()).enableHiveSupport().getOrCreate()
  ss.sparkContext.hadoopConfiguration.set("fs.defaultFS", "file:///")
  private val sc = ss.sparkContext

  def main(args: Array[String]) {
    val fs = FileSystem.get(sc.hadoopConfiguration)
    fs.delete(new Path(dirName), true)
    Thread.sleep(1000)

    /*expected*/
    val tableRaw = Dims.df
    val tableNewExp = seqColumnGeneratorExcepted(tableRaw)
    tableNewExp.persist()
    tableNewExp.show(10, 100)

    /*unexpected*/
    Thread.sleep(1000)
    val tableNewUnexp = seqColumnGeneratorUnexpected(tableRaw)
    tableNewExp.persist()
    tableNewUnexp.show(10, 100)
  }

  /*normal*/
  def seqColumnGeneratorExcepted[T](ds: Dataset[T]) = {
    ds.withColumn("seq_name", seqTokenUdf(col("id"), col("name")))
  }

  /*abnormal*/
  def seqColumnGeneratorUnexpected[T](ds: Dataset[T]) = {
    seqColumnGeneratorExcepted(ds)
        .filter(col("id") > 10) //expect
        .filter(length(col("name")) > 4) //expect
        .filter(size(col("seq_name")) > 1) //unexpect if filter exist
  }

  /*validator udf*/
  def seqTokenUdf = udf {
    (id: Int, name: String) => {
      /*validator 1: console print*/
      println(name + "_" + id + "_" + System.currentTimeMillis())

      /*validator 2: write file in case of executor not printing console*/
      val fs = FileSystem.get(sc.hadoopConfiguration)
      fs.setWriteChecksum(false)
      val fileName = Seq(dirName, name + "_" + 
System.currentTimeMillis.toString) mkString separator
      fs.create(new Path(fileName))

      /*return*/
      Seq[String](name, System.currentTimeMillis().toString)
    }
  }

  /*test data mock*/
  object Dims {
    private val structTypes = StructType(Seq(
      StructField("id", IntegerType),
      StructField("name", StringType)
    ))

    private val data = List(
      Row(100, "file100"),
      Row(101, "file101"),
      Row(102, "file102")
    )

    private val rdd = sc.parallelize(data)
    val df = ss.createDataFrame(rdd, structTypes)
  }

}
{code}

Is there any problem in the size() function?

Kind regards,
chenfh5

  was:
I occur an issue about spark-sql. When obtaining a Dataset through some logic, 
I wish to persist this Dataset as it would be used many times in the future. 
However, when persisting it, the logic would be calculated twice. Therefore I 
make some local test to reproduce this issue, and it happens.

I test in three filter function, and found that, 
.filter(col("id") > 10) //expect
.filter(length(col("name")) > 4) //expect
.filter(size(col("seq_name")) > 1) //unexpect if filter exist

i.e., the twice calculation issue occurs when filter out result exists.

h5. result image
h6. expected
!http://upload-images.jianshu.io/upload_images/2189341-51ee89377e7159ac.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240!

h6. unexpected
!http://upload-images.jianshu.io/upload_images/2189341-f87ee2047f0cf120.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240!

h5. reproduce code
{code:scala}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Dataset, Row, SparkSession}


object TwiceCalculationReproducer {
  private val separator = scala.reflect.io.File.separator
  private val dirName = new java.io.File("").getAbsolutePath + separator + 
"testOutput"

  System.setProperty("spark.app.name", "TestController")
  System.setProperty("spark.master", "local[2]")
  private val ss = SparkSession.builder().config(new 
SparkConf()).enableHiveSupport().getOrCreate()
  ss.sparkContext.hadoopConfiguration.set("fs.defaultFS", "file:///")
  private val sc = ss.sparkContext

  def main(args: Array[String]) {
    val fs = FileSystem.get(sc.hadoopConfiguration)
    fs.delete(new Path(dirName), true)
    Thread.sleep(1000)

    /*expected*/
    val tableRaw = Dims.df
    val tableNewExp = seqColumnGeneratorExcepted(tableRaw)
    tableNewExp.persist()
    tableNewExp.show(10, 100)

    /*unexpected*/
    Thread.sleep(1000)
    val tableNewUnexp = seqColumnGeneratorUnexpected(tableRaw)
    tableNewExp.persist()
    tableNewUnexp.show(10, 100)
  }

  /*normal*/
  def seqColumnGeneratorExcepted[T](ds: Dataset[T]) = {
    ds.withColumn("seq_name", seqTokenUdf(col("id"), col("name")))
  }

  /*abnormal*/
  def seqColumnGeneratorUnexpected[T](ds: Dataset[T]) = {
    seqColumnGeneratorExcepted(ds)
        .filter(col("id") > 10) //expect
        .filter(length(col("name")) > 4) //expect
        .filter(size(col("seq_name")) > 1) //unexpect if filter exist
  }

  /*validator udf*/
  def seqTokenUdf = udf {
    (id: Int, name: String) => {
      /*validator 1: console print*/
      println(name + "_" + id + "_" + System.currentTimeMillis())

      /*validator 2: write file in case of executor not printing console*/
      val fs = FileSystem.get(sc.hadoopConfiguration)
      fs.setWriteChecksum(false)
      val fileName = Seq(dirName, name + "_" + 
System.currentTimeMillis.toString) mkString separator
      fs.create(new Path(fileName))

      /*return*/
      Seq[String](name, System.currentTimeMillis().toString)
    }
  }

  /*test data mock*/
  object Dims {
    private val structTypes = StructType(Seq(
      StructField("id", IntegerType),
      StructField("name", StringType)
    ))

    private val data = List(
      Row(100, "file100"),
      Row(101, "file101"),
      Row(102, "file102")
    )

    private val rdd = sc.parallelize(data)
    val df = ss.createDataFrame(rdd, structTypes)
  }

}
{code}


Kind regards,
chenfh5


> Spark sql filter with size function(if exists) leads twice calculation
> ----------------------------------------------------------------------
>
>                 Key: SPARK-22702
>                 URL: https://issues.apache.org/jira/browse/SPARK-22702
>             Project: Spark
>          Issue Type: Question
>          Components: SQL
>    Affects Versions: 2.1.0
>         Environment: jdk1.7.0_67
> spark-hive_2.11
>            Reporter: chenfh5
>            Priority: Minor
>
> I occur an issue about spark-sql. When obtaining a Dataset through some 
> logic, I wish to persist this Dataset as it would be used many times in the 
> future. However, when persisting it, the logic would be calculated twice. 
> Therefore I make some local test to reproduce this issue, and it happens.
> I test in three filter function, and found that, 
> .filter(col("id") > 10) //expect
> .filter(length(col("name")) > 4) //expect
> .filter(size(col("seq_name")) > 1) //unexpect if filter exist
> i.e., the twice calculation issue occurs when filter out result exists.
> h5. result image
> h6. expected
> !http://upload-images.jianshu.io/upload_images/2189341-51ee89377e7159ac.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240!
> h6. unexpected
> !http://upload-images.jianshu.io/upload_images/2189341-f87ee2047f0cf120.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240!
> h5. reproduce code
> {code:scala}
> import org.apache.hadoop.fs.{FileSystem, Path}
> import org.apache.spark.SparkConf
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.{Dataset, Row, SparkSession}
> object TwiceCalculationReproducer {
>   private val separator = scala.reflect.io.File.separator
>   private val dirName = new java.io.File("").getAbsolutePath + separator + 
> "testOutput"
>   System.setProperty("spark.app.name", "TestController")
>   System.setProperty("spark.master", "local[2]")
>   private val ss = SparkSession.builder().config(new 
> SparkConf()).enableHiveSupport().getOrCreate()
>   ss.sparkContext.hadoopConfiguration.set("fs.defaultFS", "file:///")
>   private val sc = ss.sparkContext
>   def main(args: Array[String]) {
>     val fs = FileSystem.get(sc.hadoopConfiguration)
>     fs.delete(new Path(dirName), true)
>     Thread.sleep(1000)
>     /*expected*/
>     val tableRaw = Dims.df
>     val tableNewExp = seqColumnGeneratorExcepted(tableRaw)
>     tableNewExp.persist()
>     tableNewExp.show(10, 100)
>     /*unexpected*/
>     Thread.sleep(1000)
>     val tableNewUnexp = seqColumnGeneratorUnexpected(tableRaw)
>     tableNewExp.persist()
>     tableNewUnexp.show(10, 100)
>   }
>   /*normal*/
>   def seqColumnGeneratorExcepted[T](ds: Dataset[T]) = {
>     ds.withColumn("seq_name", seqTokenUdf(col("id"), col("name")))
>   }
>   /*abnormal*/
>   def seqColumnGeneratorUnexpected[T](ds: Dataset[T]) = {
>     seqColumnGeneratorExcepted(ds)
>         .filter(col("id") > 10) //expect
>         .filter(length(col("name")) > 4) //expect
>         .filter(size(col("seq_name")) > 1) //unexpect if filter exist
>   }
>   /*validator udf*/
>   def seqTokenUdf = udf {
>     (id: Int, name: String) => {
>       /*validator 1: console print*/
>       println(name + "_" + id + "_" + System.currentTimeMillis())
>       /*validator 2: write file in case of executor not printing console*/
>       val fs = FileSystem.get(sc.hadoopConfiguration)
>       fs.setWriteChecksum(false)
>       val fileName = Seq(dirName, name + "_" + 
> System.currentTimeMillis.toString) mkString separator
>       fs.create(new Path(fileName))
>       /*return*/
>       Seq[String](name, System.currentTimeMillis().toString)
>     }
>   }
>   /*test data mock*/
>   object Dims {
>     private val structTypes = StructType(Seq(
>       StructField("id", IntegerType),
>       StructField("name", StringType)
>     ))
>     private val data = List(
>       Row(100, "file100"),
>       Row(101, "file101"),
>       Row(102, "file102")
>     )
>     private val rdd = sc.parallelize(data)
>     val df = ss.createDataFrame(rdd, structTypes)
>   }
> }
> {code}
> Is there any problem in the size() function?
> Kind regards,
> chenfh5



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to