This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 0f9cca5b419 [SPARK-44287][SQL][FOLLOWUP] Do not trigger execution too early 0f9cca5b419 is described below commit 0f9cca5b419b09f25c45904aa81bf0515f9e7c44 Author: Wenchen Fan <wenc...@databricks.com> AuthorDate: Mon Jul 31 09:16:02 2023 +0900 [SPARK-44287][SQL][FOLLOWUP] Do not trigger execution too early ### What changes were proposed in this pull request? This is a followup of https://github.com/apache/spark/pull/41839 , to fix an unintentional change. That PR added an optimization to return an empty iterator directly if the input iterator is empty. However, checking `inputIterator.hasNext` may trigger query execution, which is different than before. It should be completely lazy and wait for the root operator's iterator to trigger the execution. ### Why are the changes needed? fix unintentional change ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing tests Closes #42226 from cloud-fan/fo. Authored-by: Wenchen Fan <wenc...@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../sql/execution/ColumnarEvaluatorFactory.scala | 57 ++++++++++------------ 1 file changed, 26 insertions(+), 31 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarEvaluatorFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarEvaluatorFactory.scala index 949722d3cc2..960d4b74a1b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarEvaluatorFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarEvaluatorFactory.scala @@ -70,42 +70,37 @@ class RowToColumnarEvaluatorFactory( inputs: Iterator[InternalRow]*): Iterator[ColumnarBatch] = { assert(inputs.length == 1) val rowIterator = inputs.head + new Iterator[ColumnarBatch] { + private lazy val converters = new RowToColumnConverter(schema) + private lazy val vectors: Seq[WritableColumnVector] = if (enableOffHeapColumnVector) { + OffHeapColumnVector.allocateColumns(numRows, schema) + } else { + OnHeapColumnVector.allocateColumns(numRows, schema) + } + private lazy val cb: ColumnarBatch = new ColumnarBatch(vectors.toArray) - if (rowIterator.hasNext) { - new Iterator[ColumnarBatch] { - private val converters = new RowToColumnConverter(schema) - private val vectors: Seq[WritableColumnVector] = if (enableOffHeapColumnVector) { - OffHeapColumnVector.allocateColumns(numRows, schema) - } else { - OnHeapColumnVector.allocateColumns(numRows, schema) - } - private val cb: ColumnarBatch = new ColumnarBatch(vectors.toArray) - - TaskContext.get().addTaskCompletionListener[Unit] { _ => - cb.close() - } + TaskContext.get().addTaskCompletionListener[Unit] { _ => + cb.close() + } - override def hasNext: Boolean = { - rowIterator.hasNext - } + override def hasNext: Boolean = { + rowIterator.hasNext + } - override def next(): ColumnarBatch = { - cb.setNumRows(0) - vectors.foreach(_.reset()) - var rowCount = 0 - while (rowCount < numRows && rowIterator.hasNext) { - val row = rowIterator.next() - converters.convert(row, vectors.toArray) - rowCount += 1 - } - cb.setNumRows(rowCount) - numInputRows += rowCount - numOutputBatches += 1 - cb + override def next(): ColumnarBatch = { + cb.setNumRows(0) + vectors.foreach(_.reset()) + var rowCount = 0 + while (rowCount < numRows && rowIterator.hasNext) { + val row = rowIterator.next() + converters.convert(row, vectors.toArray) + rowCount += 1 } + cb.setNumRows(rowCount) + numInputRows += rowCount + numOutputBatches += 1 + cb } - } else { - Iterator.empty } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org