This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push: new c342bcd Revert "[SPARK-33277][PYSPARK][SQL][2.4] Use ContextAwareIterator to stop consumin…" c342bcd is described below commit c342bcd4c4ba68506ca6b459bd3a9c688d2aecfa Author: HyukjinKwon <gurwls...@apache.org> AuthorDate: Thu Nov 5 16:16:07 2020 +0900 Revert "[SPARK-33277][PYSPARK][SQL][2.4] Use ContextAwareIterator to stop consumin…" This reverts commit cabf9571cd5bc620b5cddf5a4d003f29ed5b5459. --- python/pyspark/sql/tests.py | 42 ---------------------- .../sql/execution/python/EvalPythonExec.scala | 18 +--------- 2 files changed, 1 insertion(+), 59 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 8a25311..b995227 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -3628,26 +3628,6 @@ class SQLTests(ReusedSQLTestCase): finally: self.spark.catalog.dropTempView("v") - # SPARK-33277 - def test_udf_with_column_vector(self): - path = tempfile.mkdtemp() - shutil.rmtree(path) - - try: - self.spark.range(0, 100000, 1, 1).write.parquet(path) - - def f(x): - return 0 - - fUdf = udf(f, LongType()) - - for offheap in ["true", "false"]: - with self.sql_conf({"spark.sql.columnVector.offheap.enabled": offheap}): - self.assertEquals( - self.spark.read.parquet(path).select(fUdf('id')).head(), Row(0)) - finally: - shutil.rmtree(path) - class HiveSparkSubmitTests(SparkSubmitTests): @@ -5595,28 +5575,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase): finally: shutil.rmtree(path) - # SPARK-33277 - def test_pandas_udf_with_column_vector(self): - import pandas as pd - from pyspark.sql.functions import pandas_udf - - path = tempfile.mkdtemp() - shutil.rmtree(path) - - try: - self.spark.range(0, 200000, 1, 1).write.parquet(path) - - @pandas_udf(LongType()) - def udf(x): - return pd.Series([0] * len(x)) - - for offheap in ["true", "false"]: - with self.sql_conf({"spark.sql.columnVector.offheap.enabled": offheap}): - self.assertEquals( - self.spark.read.parquet(path).select(udf('id')).head(), Row(0)) - finally: - shutil.rmtree(path) - @unittest.skipIf( not _have_pandas or not _have_pyarrow, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala index 293a7c0..942a6db 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala @@ -88,7 +88,6 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chil inputRDD.mapPartitions { iter => val context = TaskContext.get() - val contextAwareIterator = new ContextAwareIterator(iter, context) // The queue used to buffer input rows so we can drain it to // combine input with output from Python. @@ -120,7 +119,7 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chil }) // Add rows to queue to join later with the result. - val projectedRowIter = contextAwareIterator.map { inputRow => + val projectedRowIter = iter.map { inputRow => queue.add(inputRow.asInstanceOf[UnsafeRow]) projection(inputRow) } @@ -137,18 +136,3 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chil } } } - -/** - * A TaskContext aware iterator. - * - * As the Python evaluation consumes the parent iterator in a separate thread, - * it could consume more data from the parent even after the task ends and the parent is closed. - * Thus, we should use ContextAwareIterator to stop consuming after the task ends. - */ -class ContextAwareIterator[IN](iter: Iterator[IN], context: TaskContext) extends Iterator[IN] { - - override def hasNext: Boolean = - !context.isCompleted() && !context.isInterrupted() && iter.hasNext - - override def next(): IN = iter.next() -} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org