This is an automated email from the ASF dual-hosted git repository.

meng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d770db  [SPARK-27968] ArrowEvalPythonExec.evaluate shouldn't eagerly 
read the first row
4d770db is described below

commit 4d770db0eb252c56072f093eae318bad3d20b8d7
Author: Xiangrui Meng <m...@databricks.com>
AuthorDate: Thu Jun 6 15:45:44 2019 -0700

    [SPARK-27968] ArrowEvalPythonExec.evaluate shouldn't eagerly read the first 
row
    
    ## What changes were proposed in this pull request?
    
    Issued fixed in https://github.com/apache/spark/pull/24734 but that PR 
might takes longer to merge.
    
    ## How was this patch tested?
    
    It should pass existing unit tests.
    
    Closes #24816 from mengxr/SPARK-27968.
    
    Authored-by: Xiangrui Meng <m...@databricks.com>
    Signed-off-by: Xiangrui Meng <m...@databricks.com>
---
 .../sql/execution/python/ArrowEvalPythonExec.scala | 27 ++++------------------
 1 file changed, 5 insertions(+), 22 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
index 000ae97..73a43af 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
@@ -86,28 +86,11 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], 
resultAttrs: Seq[Attribute]
       sessionLocalTimeZone,
       pythonRunnerConf).compute(batchIter, context.partitionId(), context)
 
-    new Iterator[InternalRow] {
-
-      private var currentIter = if (columnarBatchIter.hasNext) {
-        val batch = columnarBatchIter.next()
-        val actualDataTypes = (0 until batch.numCols()).map(i => 
batch.column(i).dataType())
-        assert(outputTypes == actualDataTypes, "Invalid schema from 
pandas_udf: " +
-          s"expected ${outputTypes.mkString(", ")}, got 
${actualDataTypes.mkString(", ")}")
-        batch.rowIterator.asScala
-      } else {
-        Iterator.empty
-      }
-
-      override def hasNext: Boolean = currentIter.hasNext || {
-        if (columnarBatchIter.hasNext) {
-          currentIter = columnarBatchIter.next().rowIterator.asScala
-          hasNext
-        } else {
-          false
-        }
-      }
-
-      override def next(): InternalRow = currentIter.next()
+    columnarBatchIter.flatMap { batch =>
+      val actualDataTypes = (0 until batch.numCols()).map(i => 
batch.column(i).dataType())
+      assert(outputTypes == actualDataTypes, "Invalid schema from pandas_udf: 
" +
+        s"expected ${outputTypes.mkString(", ")}, got 
${actualDataTypes.mkString(", ")}")
+      batch.rowIterator.asScala
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to