Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22206#discussion_r212489210
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 ---
    @@ -130,10 +133,22 @@ object DataSourceV2Strategy extends Strategy {
             config)
     
           val filterCondition = postScanFilters.reduceLeftOption(And)
    -      val withFilter = filterCondition.map(FilterExec(_, 
scan)).getOrElse(scan)
    +
    +      val withFilter = if (filterCondition.exists(hasScalarPythonUDF)) {
    +        // add a projection before FilterExec to ensure that the rows are 
converted to unsafe
    +        val filterExpr = filterCondition.get
    +        FilterExec(filterExpr, ProjectExec(filterExpr.references.toSeq, 
scan))
    +      } else {
    +        filterCondition.map(FilterExec(_, scan)).getOrElse(scan)
    +      }
     
           // always add the projection, which will produce unsafe rows 
required by some operators
    -      ProjectExec(project, withFilter) :: Nil
    +      if (project.exists(hasScalarPythonUDF)) {
    +        val references = project.map(_.references).reduce(_ ++ _).toSeq
    +        ProjectExec(project, ProjectExec(references, withFilter)) :: Nil
    --- End diff --
    
    That one was only added if there was a filter and if that filter ran a UDF. 
This will add an unnecessary project if both the filter and the project have 
python UDFs, but I thought that was probably okay. I can add a boolean to 
signal if the filter caused one to be added already if you think it's worth it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to