This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new 3107c1e642b [SPARK-44361][SQL][FOLLOW-UP] Remove unused variables and
fix import statements
3107c1e642b is described below
commit 3107c1e642b5efe7fd88329197d912f72f711c80
Author: Hyukjin Kwon <[email protected]>
AuthorDate: Wed Jul 19 18:02:27 2023 +0900
[SPARK-44361][SQL][FOLLOW-UP] Remove unused variables and fix import
statements
### What changes were proposed in this pull request?
This PR is a followup of https://github.com/apache/spark/pull/42024 that
removes unused variables and fix import statements (which should be the part of
the whole refactoring).
### Why are the changes needed?
To properly cleanup.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests should covoer
Closes #42068 from HyukjinKwon/SPARK-44361-followup.
Lead-authored-by: Hyukjin Kwon <[email protected]>
Co-authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit bca28f87ae12ffe3b49c78503af580b503f120ee)
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../python/MapInBatchEvaluatorFactory.scala | 25 ++++++++++------------
.../sql/execution/python/MapInBatchExec.scala | 6 ------
2 files changed, 11 insertions(+), 20 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchEvaluatorFactory.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchEvaluatorFactory.scala
index efb063476a4..1e15aa7f777 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchEvaluatorFactory.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchEvaluatorFactory.scala
@@ -19,9 +19,8 @@ package org.apache.spark.sql.execution.python
import scala.collection.JavaConverters._
-import org.apache.spark.{ContextAwareIterator, TaskContext}
-import org.apache.spark.{PartitionEvaluator, PartitionEvaluatorFactory}
-import org.apache.spark.api.python.{ChainedPythonFunctions}
+import org.apache.spark.{ContextAwareIterator, PartitionEvaluator,
PartitionEvaluatorFactory, TaskContext}
+import org.apache.spark.api.python.ChainedPythonFunctions
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.metric.SQLMetric
@@ -76,17 +75,15 @@ class MapInBatchEvaluatorFactory(
val unsafeProj = UnsafeProjection.create(output, output)
- columnarBatchIter
- .flatMap { batch =>
- // Scalar Iterator UDF returns a StructType column in ColumnarBatch,
select
- // the children here
- val structVector = batch.column(0).asInstanceOf[ArrowColumnVector]
- val outputVectors = output.indices.map(structVector.getChild)
- val flattenedBatch = new ColumnarBatch(outputVectors.toArray)
- flattenedBatch.setNumRows(batch.numRows())
- flattenedBatch.rowIterator.asScala
- }
- .map(unsafeProj)
+ columnarBatchIter.flatMap { batch =>
+ // Scalar Iterator UDF returns a StructType column in ColumnarBatch,
select
+ // the children here
+ val structVector = batch.column(0).asInstanceOf[ArrowColumnVector]
+ val outputVectors = output.indices.map(structVector.getChild)
+ val flattenedBatch = new ColumnarBatch(outputVectors.toArray)
+ flattenedBatch.setNumRows(batch.numRows())
+ flattenedBatch.rowIterator.asScala
+ }.map(unsafeProj)
}
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchExec.scala
index 0703f57c33d..368184934fa 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchExec.scala
@@ -39,14 +39,8 @@ trait MapInBatchExec extends UnaryExecNode with
PythonSQLMetrics {
protected val isBarrier: Boolean
- private val pythonFunction = func.asInstanceOf[PythonUDF].func
-
override def producedAttributes: AttributeSet = AttributeSet(output)
- private val batchSize = conf.arrowMaxRecordsPerBatch
-
- private val largeVarTypes = conf.arrowUseLargeVarTypes
-
private[this] val jobArtifactUUID =
JobArtifactSet.getCurrentJobArtifactState.map(_.uuid)
override def outputPartitioning: Partitioning = child.outputPartitioning
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]