This is an automated email from the ASF dual-hosted git repository.
philo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 18449a29e3 [GLUTEN-10524] Remove unnecessary `outputAttributes` from
`BasicScanExecTransformer` (#10525)
18449a29e3 is described below
commit 18449a29e386ab351e75cfbd1b0d5d29e15614db
Author: Jiaan Geng <[email protected]>
AuthorDate: Tue Sep 2 11:56:58 2025 +0800
[GLUTEN-10524] Remove unnecessary `outputAttributes` from
`BasicScanExecTransformer` (#10525)
---
.../gluten/backendsapi/clickhouse/CHIteratorApi.scala | 2 +-
.../gluten/execution/MicroBatchScanExecTransformer.scala | 4 +---
.../gluten/execution/BasicScanExecTransformer.scala | 3 ---
.../gluten/execution/BatchScanExecTransformer.scala | 2 --
.../gluten/execution/FileSourceScanExecTransformer.scala | 2 --
.../spark/sql/hive/HiveTableScanExecTransformer.scala | 14 +++++---------
.../sql/hive/execution/AbstractHiveTableScanExec.scala | 16 ++++++++++++----
.../sql/hive/execution/AbstractHiveTableScanExec.scala | 16 ++++++++++++----
.../sql/hive/execution/AbstractHiveTableScanExec.scala | 16 ++++++++++++----
.../sql/hive/execution/AbstractHiveTableScanExec.scala | 16 ++++++++++++----
.../sql/hive/execution/AbstractHiveTableScanExec.scala | 16 ++++++++++++----
11 files changed, 67 insertions(+), 40 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
index 86560b3d36..62ca100cc3 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
@@ -119,7 +119,7 @@ class CHIteratorApi extends IteratorApi with Logging with
LogLevelUtil {
scan: BasicScanExecTransformer): Unit = {
if (scan.fileFormat == ReadFileFormat.TextReadFormat) {
val names =
-
ConverterUtils.collectAttributeNamesWithoutExprId(scan.outputAttributes())
+ ConverterUtils.collectAttributeNamesWithoutExprId(scan.output)
localFilesNode.setFileSchema(getFileSchema(scan.getDataSchema,
names.asScala.toSeq))
}
}
diff --git
a/gluten-kafka/src/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala
b/gluten-kafka/src/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala
index c2bb53e15e..f644322764 100644
---
a/gluten-kafka/src/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala
+++
b/gluten-kafka/src/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala
@@ -21,7 +21,7 @@ import org.apache.gluten.substrait.rel.{ReadRelNode,
SplitInfo}
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, Expression}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
Expression}
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.read.{InputPartition,
PartitionReaderFactory, Scan}
import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset}
@@ -67,8 +67,6 @@ case class MicroBatchScanExecTransformer(
override def getMetadataColumns(): Seq[AttributeReference] = Seq.empty
- override def outputAttributes(): Seq[Attribute] = output
-
override def getPartitions: Seq[InputPartition] = inputPartitionsShim
/** Returns the actual schema of this data source scan. */
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
index 0c38f645ce..7967122424 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
@@ -41,8 +41,6 @@ trait BasicScanExecTransformer extends LeafTransformSupport
with BaseDataSource
/** Returns the filters that can be pushed down to native file scan */
def filterExprs(): Seq[Expression]
- def outputAttributes(): Seq[Attribute]
-
def getMetadataColumns(): Seq[AttributeReference]
/** This can be used to report FileFormat for a file based scan operator. */
@@ -117,7 +115,6 @@ trait BasicScanExecTransformer extends LeafTransformSupport
with BaseDataSource
}
override protected def doTransform(context: SubstraitContext):
TransformContext = {
- val output = outputAttributes()
val typeNodes = ConverterUtils.collectAttributeTypeNodes(output)
val nameList = ConverterUtils.collectAttributeNamesWithoutExprId(output)
val columnTypeNodes = output.map {
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
index 30b44159c3..ae13078f99 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
@@ -128,8 +128,6 @@ abstract class BatchScanExecTransformerBase(
override def getMetadataColumns(): Seq[AttributeReference] = Seq.empty
- override def outputAttributes(): Seq[Attribute] = output
-
// With storage partition join, the return partition type is changed, so as
SplitInfo
def getPartitionsWithIndex: Seq[Seq[InputPartition]] = finalPartitions
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
index 072ab78e6c..61c94747bc 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
@@ -123,8 +123,6 @@ abstract class FileSourceScanExecTransformerBase(
override def getMetadataColumns(): Seq[AttributeReference] = metadataColumns
- override def outputAttributes(): Seq[Attribute] = output
-
override def getPartitions: Seq[InputPartition] = {
BackendsApiManager.getTransformerApiInstance.genInputPartitionSeq(
relation,
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
index 6911672376..d83edfb22d 100644
---
a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
@@ -46,7 +46,11 @@ case class HiveTableScanExecTransformer(
relation: HiveTableRelation,
partitionPruningPred: Seq[Expression],
prunedOutput: Seq[Attribute] = Seq.empty[Attribute])(@transient session:
SparkSession)
- extends AbstractHiveTableScanExec(requestedAttributes, relation,
partitionPruningPred)(session)
+ extends AbstractHiveTableScanExec(
+ requestedAttributes,
+ relation,
+ partitionPruningPred,
+ prunedOutput)(session)
with BasicScanExecTransformer {
@transient override lazy val metrics: Map[String, SQLMetric] =
@@ -63,14 +67,6 @@ case class HiveTableScanExecTransformer(
override def getMetadataColumns(): Seq[AttributeReference] = Seq.empty
- override def outputAttributes(): Seq[Attribute] = {
- if (prunedOutput.nonEmpty) {
- prunedOutput
- } else {
- output
- }
- }
-
override def getPartitions: Seq[InputPartition] = partitions
override def getPartitionSchema: StructType =
relation.tableMeta.partitionSchema
diff --git
a/shims/spark32/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala
b/shims/spark32/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala
index e1ab055e18..2ba66c795a 100644
---
a/shims/spark32/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala
+++
b/shims/spark32/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala
@@ -50,11 +50,15 @@ import scala.collection.JavaConverters._
* The Hive table be scanned.
* @param partitionPruningPred
* An optional partition pruning predicate for partitioned table.
+ * @param prunedOutput
+ * The pruned output.
*/
abstract private[hive] class AbstractHiveTableScanExec(
requestedAttributes: Seq[Attribute],
relation: HiveTableRelation,
- partitionPruningPred: Seq[Expression])(@transient protected val
sparkSession: SparkSession)
+ partitionPruningPred: Seq[Expression],
+ prunedOutput: Seq[Attribute] = Seq.empty[Attribute])(
+ @transient protected val sparkSession: SparkSession)
extends LeafExecNode
with CastSupport {
@@ -74,9 +78,13 @@ abstract private[hive] class AbstractHiveTableScanExec(
private val originalAttributes = AttributeMap(relation.output.map(a => a ->
a))
- override val output: Seq[Attribute] = {
- // Retrieve the original attributes based on expression ID so that
capitalization matches.
- requestedAttributes.map(attr => originalAttributes.getOrElse(attr,
attr)).distinct
+ override def output: Seq[Attribute] = {
+ if (prunedOutput.nonEmpty) {
+ prunedOutput
+ } else {
+ // Retrieve the original attributes based on expression ID so that
capitalization matches.
+ requestedAttributes.map(attr => originalAttributes.getOrElse(attr,
attr)).distinct
+ }
}
// Bind all partition key attribute references in the partition pruning
predicate for later
diff --git
a/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala
b/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala
index 52104f3eea..ad1857b721 100644
---
a/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala
+++
b/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala
@@ -50,11 +50,15 @@ import scala.collection.JavaConverters._
* The Hive table be scanned.
* @param partitionPruningPred
* An optional partition pruning predicate for partitioned table.
+ * @param prunedOutput
+ * The pruned output.
*/
abstract private[hive] class AbstractHiveTableScanExec(
requestedAttributes: Seq[Attribute],
relation: HiveTableRelation,
- partitionPruningPred: Seq[Expression])(@transient protected val
sparkSession: SparkSession)
+ partitionPruningPred: Seq[Expression],
+ prunedOutput: Seq[Attribute] = Seq.empty[Attribute])(
+ @transient protected val sparkSession: SparkSession)
extends LeafExecNode
with CastSupport {
@@ -74,9 +78,13 @@ abstract private[hive] class AbstractHiveTableScanExec(
private val originalAttributes = AttributeMap(relation.output.map(a => a ->
a))
- override val output: Seq[Attribute] = {
- // Retrieve the original attributes based on expression ID so that
capitalization matches.
- requestedAttributes.map(attr => originalAttributes.getOrElse(attr,
attr)).distinct
+ override def output: Seq[Attribute] = {
+ if (prunedOutput.nonEmpty) {
+ prunedOutput
+ } else {
+ // Retrieve the original attributes based on expression ID so that
capitalization matches.
+ requestedAttributes.map(attr => originalAttributes.getOrElse(attr,
attr)).distinct
+ }
}
// Bind all partition key attribute references in the partition pruning
predicate for later
diff --git
a/shims/spark34/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala
b/shims/spark34/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala
index 48f5bb3cbd..8422d33e52 100644
---
a/shims/spark34/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala
+++
b/shims/spark34/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala
@@ -52,11 +52,15 @@ import scala.collection.JavaConverters._
* The Hive table be scanned.
* @param partitionPruningPred
* An optional partition pruning predicate for partitioned table.
+ * @param prunedOutput
+ * The pruned output.
*/
abstract private[hive] class AbstractHiveTableScanExec(
requestedAttributes: Seq[Attribute],
relation: HiveTableRelation,
- partitionPruningPred: Seq[Expression])(@transient protected val
sparkSession: SparkSession)
+ partitionPruningPred: Seq[Expression],
+ prunedOutput: Seq[Attribute] = Seq.empty[Attribute])(
+ @transient protected val sparkSession: SparkSession)
extends LeafExecNode
with CastSupport {
@@ -76,9 +80,13 @@ abstract private[hive] class AbstractHiveTableScanExec(
private val originalAttributes = AttributeMap(relation.output.map(a => a ->
a))
- override val output: Seq[Attribute] = {
- // Retrieve the original attributes based on expression ID so that
capitalization matches.
- requestedAttributes.map(attr => originalAttributes.getOrElse(attr,
attr)).distinct
+ override def output: Seq[Attribute] = {
+ if (prunedOutput.nonEmpty) {
+ prunedOutput
+ } else {
+ // Retrieve the original attributes based on expression ID so that
capitalization matches.
+ requestedAttributes.map(attr => originalAttributes.getOrElse(attr,
attr)).distinct
+ }
}
// Bind all partition key attribute references in the partition pruning
predicate for later
diff --git
a/shims/spark35/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala
b/shims/spark35/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala
index 48f5bb3cbd..8422d33e52 100644
---
a/shims/spark35/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala
+++
b/shims/spark35/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala
@@ -52,11 +52,15 @@ import scala.collection.JavaConverters._
* The Hive table be scanned.
* @param partitionPruningPred
* An optional partition pruning predicate for partitioned table.
+ * @param prunedOutput
+ * The pruned output.
*/
abstract private[hive] class AbstractHiveTableScanExec(
requestedAttributes: Seq[Attribute],
relation: HiveTableRelation,
- partitionPruningPred: Seq[Expression])(@transient protected val
sparkSession: SparkSession)
+ partitionPruningPred: Seq[Expression],
+ prunedOutput: Seq[Attribute] = Seq.empty[Attribute])(
+ @transient protected val sparkSession: SparkSession)
extends LeafExecNode
with CastSupport {
@@ -76,9 +80,13 @@ abstract private[hive] class AbstractHiveTableScanExec(
private val originalAttributes = AttributeMap(relation.output.map(a => a ->
a))
- override val output: Seq[Attribute] = {
- // Retrieve the original attributes based on expression ID so that
capitalization matches.
- requestedAttributes.map(attr => originalAttributes.getOrElse(attr,
attr)).distinct
+ override def output: Seq[Attribute] = {
+ if (prunedOutput.nonEmpty) {
+ prunedOutput
+ } else {
+ // Retrieve the original attributes based on expression ID so that
capitalization matches.
+ requestedAttributes.map(attr => originalAttributes.getOrElse(attr,
attr)).distinct
+ }
}
// Bind all partition key attribute references in the partition pruning
predicate for later
diff --git
a/shims/spark40/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala
b/shims/spark40/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala
index 48f5bb3cbd..8422d33e52 100644
---
a/shims/spark40/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala
+++
b/shims/spark40/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala
@@ -52,11 +52,15 @@ import scala.collection.JavaConverters._
* The Hive table be scanned.
* @param partitionPruningPred
* An optional partition pruning predicate for partitioned table.
+ * @param prunedOutput
+ * The pruned output.
*/
abstract private[hive] class AbstractHiveTableScanExec(
requestedAttributes: Seq[Attribute],
relation: HiveTableRelation,
- partitionPruningPred: Seq[Expression])(@transient protected val
sparkSession: SparkSession)
+ partitionPruningPred: Seq[Expression],
+ prunedOutput: Seq[Attribute] = Seq.empty[Attribute])(
+ @transient protected val sparkSession: SparkSession)
extends LeafExecNode
with CastSupport {
@@ -76,9 +80,13 @@ abstract private[hive] class AbstractHiveTableScanExec(
private val originalAttributes = AttributeMap(relation.output.map(a => a ->
a))
- override val output: Seq[Attribute] = {
- // Retrieve the original attributes based on expression ID so that
capitalization matches.
- requestedAttributes.map(attr => originalAttributes.getOrElse(attr,
attr)).distinct
+ override def output: Seq[Attribute] = {
+ if (prunedOutput.nonEmpty) {
+ prunedOutput
+ } else {
+ // Retrieve the original attributes based on expression ID so that
capitalization matches.
+ requestedAttributes.map(attr => originalAttributes.getOrElse(attr,
attr)).distinct
+ }
}
// Bind all partition key attribute references in the partition pruning
predicate for later
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]