This is an automated email from the ASF dual-hosted git repository.
zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 2fc808d27 [CORE] Remove IteratorApi.genNativeFileScanRDD, both velox
and clickchouse backend needn't it. (#5937)
2fc808d27 is described below
commit 2fc808d273712248a142f4448bc7e79285725014
Author: Chang chen <[email protected]>
AuthorDate: Fri May 31 18:06:13 2024 +0800
[CORE] Remove IteratorApi.genNativeFileScanRDD, both velox and clickchouse
backend needn't it. (#5937)
[CORE] Remove IteratorApi.genNativeFileScanRDD, both velox and clickchouse
backend needn't it.
---
.../backendsapi/clickhouse/CHIteratorApi.scala | 43 +---
.../benchmarks/CHParquetReadBenchmark.scala | 255 ---------------------
.../backendsapi/velox/VeloxIteratorApi.scala | 15 +-
.../apache/gluten/backendsapi/IteratorApi.scala | 11 -
.../execution/BasicScanExecTransformer.scala | 32 +--
.../execution/BatchScanExecTransformer.scala | 6 -
.../execution/FileSourceScanExecTransformer.scala | 6 -
.../sql/hive/HiveTableScanExecTransformer.scala | 6 -
8 files changed, 6 insertions(+), 368 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
index 4926a97eb..bc16c2d77 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
@@ -20,17 +20,16 @@ import org.apache.gluten.{GlutenConfig,
GlutenNumaBindingInfo}
import org.apache.gluten.backendsapi.IteratorApi
import org.apache.gluten.execution._
import org.apache.gluten.expression.ConverterUtils
-import org.apache.gluten.metrics.{GlutenTimeMetric, IMetrics, NativeMetrics}
+import org.apache.gluten.metrics.{IMetrics, NativeMetrics}
import org.apache.gluten.substrait.plan.PlanNode
import org.apache.gluten.substrait.rel._
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.gluten.utils.LogLevelUtil
import org.apache.gluten.vectorized.{CHNativeExpressionEvaluator,
CloseableCHColumnBatchIterator, GeneralInIterator, GeneralOutIterator}
-import org.apache.spark.{InterruptibleIterator, SparkConf, SparkContext,
TaskContext}
+import org.apache.spark.{InterruptibleIterator, SparkConf, TaskContext}
import org.apache.spark.affinity.CHAffinity
import org.apache.spark.internal.Logging
-import org.apache.spark.rdd.RDD
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.datasources.FilePartition
import org.apache.spark.sql.execution.metric.SQLMetric
@@ -315,44 +314,6 @@ class CHIteratorApi extends IteratorApi with Logging with
LogLevelUtil {
context.addTaskCompletionListener[Unit](_ => close())
new CloseableCHColumnBatchIterator(resIter, Some(pipelineTime))
}
-
- /** Generate Native FileScanRDD, currently only for ClickHouse Backend. */
- override def genNativeFileScanRDD(
- sparkContext: SparkContext,
- wsCtx: WholeStageTransformContext,
- splitInfos: Seq[SplitInfo],
- scan: BasicScanExecTransformer,
- numOutputRows: SQLMetric,
- numOutputBatches: SQLMetric,
- scanTime: SQLMetric): RDD[ColumnarBatch] = {
- val substraitPlanPartition = GlutenTimeMetric.withMillisTime {
- val planByteArray = wsCtx.root.toProtobuf.toByteArray
- splitInfos.zipWithIndex.map {
- case (splitInfo, index) =>
- val splitInfoByteArray = splitInfo match {
- case filesNode: LocalFilesNode =>
- setFileSchemaForLocalFiles(filesNode, scan)
- filesNode.setFileReadProperties(mapAsJavaMap(scan.getProperties))
- filesNode.toProtobuf.toByteArray
- case extensionTableNode: ExtensionTableNode =>
- extensionTableNode.toProtobuf.toByteArray
- }
-
- GlutenPartition(
- index,
- planByteArray,
- Array(splitInfoByteArray),
- locations = splitInfo.preferredLocations().asScala.toArray)
- }
- }(t => logInfo(s"Generating the Substrait plan took: $t ms."))
-
- new NativeFileScanColumnarRDD(
- sparkContext,
- substraitPlanPartition,
- numOutputRows,
- numOutputBatches,
- scanTime)
- }
}
object CHIteratorApi {
diff --git
a/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHParquetReadBenchmark.scala
b/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHParquetReadBenchmark.scala
deleted file mode 100644
index dc1431fa6..000000000
---
a/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHParquetReadBenchmark.scala
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.execution.benchmarks
-
-import org.apache.gluten.GlutenConfig
-import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.execution.{FileSourceScanExecTransformer,
WholeStageTransformContext}
-import org.apache.gluten.expression.ConverterUtils
-import org.apache.gluten.sql.shims.SparkShimLoader
-import org.apache.gluten.substrait.SubstraitContext
-import org.apache.gluten.substrait.plan.PlanBuilder
-import org.apache.gluten.vectorized.{CHBlockConverterJniWrapper, CHNativeBlock}
-
-import org.apache.spark.benchmark.Benchmark
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow}
-import org.apache.spark.sql.execution.FileSourceScanExec
-import org.apache.spark.sql.execution.benchmark.SqlBasedBenchmark
-import org.apache.spark.sql.execution.datasources.{FilePartition,
PartitionedFile}
-import org.apache.spark.sql.vectorized.ColumnarBatch
-
-import com.google.common.collect.Lists
-
-import scala.collection.JavaConverters._
-
-/**
- * Benchmark to measure Clickhouse parquet read performance. To run this
benchmark:
- * {{{
- * 1. Run in IDEA: run this class directly;
- * 2. Run without IDEA: bin/spark-submit --class <this class>
- * --jars <spark core test jar>,<spark catalyst test jar>,<spark sql
test jar>
- * --conf xxxx=xxx
- * backends-clickhouse-XXX-tests.jar
- * parameters
- *
- * Parameters:
- * 1. parquet files dir;
- * 2. the count of the parquet file to read;
- * 3. the fields to read;
- * 4. the execution count;
- * 5. whether to run vanilla spark benchmarks;
- * }}}
- */
-object CHParquetReadBenchmark extends SqlBasedBenchmark with
CHSqlBasedBenchmark {
-
- protected lazy val appName = "CHParquetReadBenchmark"
- protected lazy val thrdNum = "1"
- protected lazy val memorySize = "4G"
- protected lazy val offheapSize = "4G"
-
- def beforeAll(): Unit = {}
-
- override def getSparkSession: SparkSession = {
- beforeAll()
- val conf = getSparkConf
- .setIfMissing("spark.sql.columnVector.offheap.enabled", "true")
- .set("spark.gluten.sql.columnar.separate.scan.rdd.for.ch", "true")
-
- SparkSession.builder.config(conf).getOrCreate()
- }
-
- override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
- val (parquetDir, readFileCnt, scanSchema, executedCnt, executedVanilla) =
- if (mainArgs.isEmpty) {
- ("/data/tpch-data/parquet/lineitem", 3, "l_orderkey,l_receiptdate", 5,
true)
- } else {
- (mainArgs(0), mainArgs(1).toInt, mainArgs(2), mainArgs(3).toInt,
mainArgs(4).toBoolean)
- }
-
- val chParquet = spark.sql(s"""
- |select $scanSchema from parquet.`$parquetDir`
- |
- |""".stripMargin)
-
- // Get the `FileSourceScanExecTransformer`
- val chScanPlan = chParquet.queryExecution.executedPlan.collect {
- case scan: FileSourceScanExecTransformer => scan
- }
-
- val chFileScan = chScanPlan.head
- val outputAttrs = chFileScan.outputAttributes()
- val filePartitions = chFileScan.getPartitions
- .take(readFileCnt)
- .map(_.asInstanceOf[FilePartition])
-
- val numOutputRows = chFileScan.longMetric("numOutputRows")
- val numOutputVectors = chFileScan.longMetric("outputVectors")
- val scanTime = chFileScan.longMetric("scanTime")
- // Generate Substrait plan
- val substraitContext = new SubstraitContext
- val transformContext = chFileScan.transform(substraitContext)
- val outNames = new java.util.ArrayList[String]()
- for (attr <- outputAttrs) {
- outNames.add(ConverterUtils.genColumnNameWithExprId(attr))
- }
- val planNode =
- PlanBuilder.makePlan(substraitContext,
Lists.newArrayList(transformContext.root), outNames)
-
- val nativeFileScanRDD =
BackendsApiManager.getIteratorApiInstance.genNativeFileScanRDD(
- spark.sparkContext,
- WholeStageTransformContext(planNode, substraitContext),
- chFileScan.getSplitInfos,
- chFileScan,
- numOutputRows,
- numOutputVectors,
- scanTime
- )
-
- // Get the total row count
- val chRowCnt = nativeFileScanRDD
- .mapPartitionsInternal(batches => batches.map(batch =>
batch.numRows().toLong))
- .collect()
- .sum
-
- val parquetReadBenchmark =
- new Benchmark(
- s"Parquet Read $readFileCnt files, fields: $scanSchema, total
$chRowCnt records",
- chRowCnt,
- output = output)
-
- parquetReadBenchmark.addCase(s"ClickHouse Parquet Read", executedCnt) {
- _ =>
- val resultRDD: RDD[Long] = nativeFileScanRDD.mapPartitionsInternal {
- batches =>
- batches.map {
- batch =>
- val block = CHNativeBlock.fromColumnarBatch(batch)
- block.totalBytes()
- block.close()
- batch.numRows().toLong
- }
- }
- resultRDD.collect()
- }
-
- parquetReadBenchmark.addCase(s"ClickHouse Parquet Read to Rows",
executedCnt) {
- _ =>
- val resultRDD: RDD[Long] = nativeFileScanRDD.mapPartitionsInternal {
- batches =>
- batches.map {
- batch =>
- val block = CHNativeBlock.fromColumnarBatch(batch)
- val info =
-
CHBlockConverterJniWrapper.convertColumnarToRow(block.blockAddress(), null)
- new Iterator[InternalRow] {
- var rowId = 0
- val row = new UnsafeRow(batch.numCols())
- var closed = false
-
- override def hasNext: Boolean = {
- val result = rowId < batch.numRows()
- if (!result && !closed) {
-
CHBlockConverterJniWrapper.freeMemory(info.memoryAddress, info.totalSize)
- closed = true
- }
- result
- }
-
- override def next: UnsafeRow = {
- if (rowId >= batch.numRows()) throw new
NoSuchElementException
-
- val (offset, length) = (info.offsets(rowId),
info.lengths(rowId))
- row.pointTo(null, info.memoryAddress + offset,
length.toInt)
- rowId += 1
- row
- }
- }.foreach(_.numFields)
- block.close()
-
- batch.numRows().toLong
- }
- }
- resultRDD.collect()
- }
-
- if (executedVanilla) {
- spark.conf.set(GlutenConfig.GLUTEN_ENABLED.key, "false")
-
- val vanillaParquet = spark.sql(s"""
- |select $scanSchema from
parquet.`$parquetDir`
- |
- |""".stripMargin)
-
- val vanillaScanPlan = vanillaParquet.queryExecution.executedPlan.collect
{
- case scan: FileSourceScanExec => scan
- }
-
- val fileScan = vanillaScanPlan.head
- val fileScanOutput = fileScan.output
- val relation = fileScan.relation
- val readFile: PartitionedFile => Iterator[InternalRow] =
- relation.fileFormat.buildReaderWithPartitionValues(
- sparkSession = relation.sparkSession,
- dataSchema = relation.dataSchema,
- partitionSchema = relation.partitionSchema,
- requiredSchema = fileScan.requiredSchema,
- filters = Seq.empty,
- options = relation.options,
- hadoopConf =
relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options)
- )
-
- val newFileScanRDD =
- SparkShimLoader.getSparkShims
- .generateFileScanRDD(spark, readFile, filePartitions, fileScan)
- .asInstanceOf[RDD[ColumnarBatch]]
-
- val rowCnt = newFileScanRDD
- .mapPartitionsInternal(batches => batches.map(batch =>
batch.numRows().toLong))
- .collect()
- .sum
- assert(chRowCnt == rowCnt, "The row count of the benchmark is not
equal.")
-
- parquetReadBenchmark.addCase(s"Vanilla Spark Parquet Read", executedCnt)
{
- _ =>
- val resultRDD: RDD[Long] = newFileScanRDD.mapPartitionsInternal {
- batches => batches.map(_.numRows().toLong)
- }
- resultRDD.collect()
- }
-
- parquetReadBenchmark.addCase(s"Vanilla Spark Parquet Read to Rows",
executedCnt) {
- _ =>
- val resultRDD: RDD[Long] = newFileScanRDD.mapPartitionsInternal {
- batches =>
- val toUnsafe = UnsafeProjection.create(fileScanOutput,
fileScanOutput)
- batches.map {
- batch =>
- // Convert to row and decode parquet value
-
batch.rowIterator().asScala.map(toUnsafe).foreach(_.numFields)
- batch.numRows().toLong
- }
- }
- resultRDD.collect()
- }
- }
-
- parquetReadBenchmark.run()
- }
-}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
index f1fbf3648..5f9b5afa9 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
@@ -27,9 +27,8 @@ import
org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.gluten.utils._
import org.apache.gluten.vectorized._
-import org.apache.spark.{SparkConf, SparkContext, TaskContext}
+import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.internal.Logging
-import org.apache.spark.rdd.RDD
import org.apache.spark.softaffinity.SoftAffinity
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter}
@@ -232,16 +231,4 @@ class VeloxIteratorApi extends IteratorApi with Logging {
.create()
}
// scalastyle:on argcount
-
- /** Generate Native FileScanRDD, currently only for ClickHouse Backend. */
- override def genNativeFileScanRDD(
- sparkContext: SparkContext,
- wsCxt: WholeStageTransformContext,
- splitInfos: Seq[SplitInfo],
- scan: BasicScanExecTransformer,
- numOutputRows: SQLMetric,
- numOutputBatches: SQLMetric,
- scanTime: SQLMetric): RDD[ColumnarBatch] = {
- throw new UnsupportedOperationException("Cannot support to generate Native
FileScanRDD.")
- }
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
index d999948d7..53dc8f478 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
@@ -24,7 +24,6 @@ import
org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.gluten.substrait.rel.SplitInfo
import org.apache.spark._
-import org.apache.spark.rdd.RDD
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types.StructType
@@ -81,14 +80,4 @@ trait IteratorApi {
partitionIndex: Int,
materializeInput: Boolean = false): Iterator[ColumnarBatch]
// scalastyle:on argcount
-
- /** Generate Native FileScanRDD, currently only for ClickHouse Backend. */
- def genNativeFileScanRDD(
- sparkContext: SparkContext,
- wsCxt: WholeStageTransformContext,
- splitInfos: Seq[SplitInfo],
- scan: BasicScanExecTransformer,
- numOutputRows: SQLMetric,
- numOutputBatches: SQLMetric,
- scanTime: SQLMetric): RDD[ColumnarBatch]
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
index 2dd5aff76..af35957ec 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
@@ -22,18 +22,14 @@ import org.apache.gluten.extension.ValidationResult
import org.apache.gluten.substrait.`type`.ColumnTypeNode
import org.apache.gluten.substrait.SubstraitContext
import org.apache.gluten.substrait.extensions.ExtensionBuilder
-import org.apache.gluten.substrait.plan.PlanBuilder
import org.apache.gluten.substrait.rel.{RelBuilder, SplitInfo}
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
-import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.hive.HiveTableScanExecTransformer
import org.apache.spark.sql.types.{BooleanType, StringType, StructField,
StructType}
-import org.apache.spark.sql.vectorized.ColumnarBatch
-import com.google.common.collect.Lists
import com.google.protobuf.StringValue
import scala.collection.JavaConverters._
@@ -75,28 +71,6 @@ trait BasicScanExecTransformer extends LeafTransformSupport
with BaseDataSource
.genSplitInfo(_, getPartitionSchema, fileFormat,
getMetadataColumns.map(_.name)))
}
- def doExecuteColumnarInternal(): RDD[ColumnarBatch] = {
- val numOutputRows = longMetric("numOutputRows")
- val numOutputVectors = longMetric("outputVectors")
- val scanTime = longMetric("scanTime")
- val substraitContext = new SubstraitContext
- val transformContext = transform(substraitContext)
- val outNames =
-
filteRedundantField(outputAttributes()).map(ConverterUtils.genColumnNameWithExprId).asJava
- val planNode =
- PlanBuilder.makePlan(substraitContext,
Lists.newArrayList(transformContext.root), outNames)
-
- BackendsApiManager.getIteratorApiInstance.genNativeFileScanRDD(
- sparkContext,
- WholeStageTransformContext(planNode, substraitContext),
- getSplitInfos,
- this,
- numOutputRows,
- numOutputVectors,
- scanTime
- )
- }
-
override protected def doValidateInternal(): ValidationResult = {
var fields = schema.fields
@@ -182,9 +156,9 @@ trait BasicScanExecTransformer extends LeafTransformSupport
with BaseDataSource
def filteRedundantField(outputs: Seq[Attribute]): Seq[Attribute] = {
var final_output: List[Attribute] = List()
val outputList = outputs.toArray
- for (i <- 0 to outputList.size - 1) {
+ for (i <- outputList.indices) {
var dup = false
- for (j <- 0 to i - 1) {
+ for (j <- 0 until i) {
if (outputList(i).name == outputList(j).name) {
dup = true
}
@@ -193,6 +167,6 @@ trait BasicScanExecTransformer extends LeafTransformSupport
with BaseDataSource
final_output = final_output :+ outputList(i)
}
}
- final_output.toSeq
+ final_output
}
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
index b0c8c59e7..64d9d6546 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
@@ -22,7 +22,6 @@ import org.apache.gluten.extension.ValidationResult
import org.apache.gluten.metrics.MetricsUpdater
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
-import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
@@ -31,7 +30,6 @@ import org.apache.spark.sql.connector.read.{InputPartition,
Scan}
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExecShim,
FileScan}
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.vectorized.ColumnarBatch
/** Columnar Based BatchScanExec. */
case class BatchScanExecTransformer(
@@ -144,10 +142,6 @@ abstract class BatchScanExecTransformerBase(
super.doValidateInternal()
}
- override def doExecuteColumnar(): RDD[ColumnarBatch] = {
- doExecuteColumnarInternal()
- }
-
override def metricsUpdater(): MetricsUpdater =
BackendsApiManager.getMetricsApiInstance.genBatchScanTransformerMetricsUpdater(metrics)
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
index c3d2da7f0..ff905251b 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
@@ -21,7 +21,6 @@ import org.apache.gluten.extension.ValidationResult
import org.apache.gluten.metrics.MetricsUpdater
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
-import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, Expression, PlanExpression}
import org.apache.spark.sql.catalyst.plans.QueryPlan
@@ -30,7 +29,6 @@ import org.apache.spark.sql.execution.FileSourceScanExecShim
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.collection.BitSet
case class FileSourceScanExecTransformer(
@@ -147,10 +145,6 @@ abstract class FileSourceScanExecTransformerBase(
super.doValidateInternal()
}
- override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
- doExecuteColumnarInternal()
- }
-
override def metricsUpdater(): MetricsUpdater =
BackendsApiManager.getMetricsApiInstance.genFileSourceScanTransformerMetricsUpdater(metrics)
diff --git
a/gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
b/gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
index 2952267e5..5dfa85b26 100644
---
a/gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
+++
b/gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
@@ -22,7 +22,6 @@ import org.apache.gluten.extension.ValidationResult
import org.apache.gluten.metrics.MetricsUpdater
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
-import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, AttributeSeq, Expression}
@@ -34,7 +33,6 @@ import
org.apache.spark.sql.hive.HiveTableScanExecTransformer._
import org.apache.spark.sql.hive.client.HiveClientImpl
import org.apache.spark.sql.hive.execution.{AbstractHiveTableScanExec,
HiveTableScanExec}
import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.Utils
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
@@ -81,10 +79,6 @@ case class HiveTableScanExecTransformer(
override def metricsUpdater(): MetricsUpdater =
BackendsApiManager.getMetricsApiInstance.genHiveTableScanTransformerMetricsUpdater(metrics)
- override def doExecuteColumnar(): RDD[ColumnarBatch] = {
- doExecuteColumnarInternal()
- }
-
@transient private lazy val hivePartitionConverter =
new HivePartitionConverter(session.sessionState.newHadoopConf(), session)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]