This is an automated email from the ASF dual-hosted git repository.

zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 2fc808d27 [CORE] Remove IteratorApi.genNativeFileScanRDD, both velox 
and clickchouse backend needn't it. (#5937)
2fc808d27 is described below

commit 2fc808d273712248a142f4448bc7e79285725014
Author: Chang chen <[email protected]>
AuthorDate: Fri May 31 18:06:13 2024 +0800

    [CORE] Remove IteratorApi.genNativeFileScanRDD, both velox and clickchouse 
backend needn't it. (#5937)
    
    [CORE] Remove IteratorApi.genNativeFileScanRDD, both velox and clickchouse 
backend needn't it.
---
 .../backendsapi/clickhouse/CHIteratorApi.scala     |  43 +---
 .../benchmarks/CHParquetReadBenchmark.scala        | 255 ---------------------
 .../backendsapi/velox/VeloxIteratorApi.scala       |  15 +-
 .../apache/gluten/backendsapi/IteratorApi.scala    |  11 -
 .../execution/BasicScanExecTransformer.scala       |  32 +--
 .../execution/BatchScanExecTransformer.scala       |   6 -
 .../execution/FileSourceScanExecTransformer.scala  |   6 -
 .../sql/hive/HiveTableScanExecTransformer.scala    |   6 -
 8 files changed, 6 insertions(+), 368 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
index 4926a97eb..bc16c2d77 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
@@ -20,17 +20,16 @@ import org.apache.gluten.{GlutenConfig, 
GlutenNumaBindingInfo}
 import org.apache.gluten.backendsapi.IteratorApi
 import org.apache.gluten.execution._
 import org.apache.gluten.expression.ConverterUtils
-import org.apache.gluten.metrics.{GlutenTimeMetric, IMetrics, NativeMetrics}
+import org.apache.gluten.metrics.{IMetrics, NativeMetrics}
 import org.apache.gluten.substrait.plan.PlanNode
 import org.apache.gluten.substrait.rel._
 import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 import org.apache.gluten.utils.LogLevelUtil
 import org.apache.gluten.vectorized.{CHNativeExpressionEvaluator, 
CloseableCHColumnBatchIterator, GeneralInIterator, GeneralOutIterator}
 
-import org.apache.spark.{InterruptibleIterator, SparkConf, SparkContext, 
TaskContext}
+import org.apache.spark.{InterruptibleIterator, SparkConf, TaskContext}
 import org.apache.spark.affinity.CHAffinity
 import org.apache.spark.internal.Logging
-import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.connector.read.InputPartition
 import org.apache.spark.sql.execution.datasources.FilePartition
 import org.apache.spark.sql.execution.metric.SQLMetric
@@ -315,44 +314,6 @@ class CHIteratorApi extends IteratorApi with Logging with 
LogLevelUtil {
     context.addTaskCompletionListener[Unit](_ => close())
     new CloseableCHColumnBatchIterator(resIter, Some(pipelineTime))
   }
-
-  /** Generate Native FileScanRDD, currently only for ClickHouse Backend. */
-  override def genNativeFileScanRDD(
-      sparkContext: SparkContext,
-      wsCtx: WholeStageTransformContext,
-      splitInfos: Seq[SplitInfo],
-      scan: BasicScanExecTransformer,
-      numOutputRows: SQLMetric,
-      numOutputBatches: SQLMetric,
-      scanTime: SQLMetric): RDD[ColumnarBatch] = {
-    val substraitPlanPartition = GlutenTimeMetric.withMillisTime {
-      val planByteArray = wsCtx.root.toProtobuf.toByteArray
-      splitInfos.zipWithIndex.map {
-        case (splitInfo, index) =>
-          val splitInfoByteArray = splitInfo match {
-            case filesNode: LocalFilesNode =>
-              setFileSchemaForLocalFiles(filesNode, scan)
-              filesNode.setFileReadProperties(mapAsJavaMap(scan.getProperties))
-              filesNode.toProtobuf.toByteArray
-            case extensionTableNode: ExtensionTableNode =>
-              extensionTableNode.toProtobuf.toByteArray
-          }
-
-          GlutenPartition(
-            index,
-            planByteArray,
-            Array(splitInfoByteArray),
-            locations = splitInfo.preferredLocations().asScala.toArray)
-      }
-    }(t => logInfo(s"Generating the Substrait plan took: $t ms."))
-
-    new NativeFileScanColumnarRDD(
-      sparkContext,
-      substraitPlanPartition,
-      numOutputRows,
-      numOutputBatches,
-      scanTime)
-  }
 }
 
 object CHIteratorApi {
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHParquetReadBenchmark.scala
 
b/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHParquetReadBenchmark.scala
deleted file mode 100644
index dc1431fa6..000000000
--- 
a/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHParquetReadBenchmark.scala
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.execution.benchmarks
-
-import org.apache.gluten.GlutenConfig
-import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.execution.{FileSourceScanExecTransformer, 
WholeStageTransformContext}
-import org.apache.gluten.expression.ConverterUtils
-import org.apache.gluten.sql.shims.SparkShimLoader
-import org.apache.gluten.substrait.SubstraitContext
-import org.apache.gluten.substrait.plan.PlanBuilder
-import org.apache.gluten.vectorized.{CHBlockConverterJniWrapper, CHNativeBlock}
-
-import org.apache.spark.benchmark.Benchmark
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow}
-import org.apache.spark.sql.execution.FileSourceScanExec
-import org.apache.spark.sql.execution.benchmark.SqlBasedBenchmark
-import org.apache.spark.sql.execution.datasources.{FilePartition, 
PartitionedFile}
-import org.apache.spark.sql.vectorized.ColumnarBatch
-
-import com.google.common.collect.Lists
-
-import scala.collection.JavaConverters._
-
-/**
- * Benchmark to measure Clickhouse parquet read performance. To run this 
benchmark:
- * {{{
- *   1. Run in IDEA: run this class directly;
- *   2. Run without IDEA: bin/spark-submit --class <this class>
- *        --jars <spark core test jar>,<spark catalyst test jar>,<spark sql 
test jar>
- *        --conf xxxx=xxx
- *        backends-clickhouse-XXX-tests.jar
- *        parameters
- *
- *   Parameters:
- *     1. parquet files dir;
- *     2. the count of the parquet file to read;
- *     3. the fields to read;
- *     4. the execution count;
- *     5. whether to run vanilla spark benchmarks;
- * }}}
- */
-object CHParquetReadBenchmark extends SqlBasedBenchmark with 
CHSqlBasedBenchmark {
-
-  protected lazy val appName = "CHParquetReadBenchmark"
-  protected lazy val thrdNum = "1"
-  protected lazy val memorySize = "4G"
-  protected lazy val offheapSize = "4G"
-
-  def beforeAll(): Unit = {}
-
-  override def getSparkSession: SparkSession = {
-    beforeAll()
-    val conf = getSparkConf
-      .setIfMissing("spark.sql.columnVector.offheap.enabled", "true")
-      .set("spark.gluten.sql.columnar.separate.scan.rdd.for.ch", "true")
-
-    SparkSession.builder.config(conf).getOrCreate()
-  }
-
-  override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
-    val (parquetDir, readFileCnt, scanSchema, executedCnt, executedVanilla) =
-      if (mainArgs.isEmpty) {
-        ("/data/tpch-data/parquet/lineitem", 3, "l_orderkey,l_receiptdate", 5, 
true)
-      } else {
-        (mainArgs(0), mainArgs(1).toInt, mainArgs(2), mainArgs(3).toInt, 
mainArgs(4).toBoolean)
-      }
-
-    val chParquet = spark.sql(s"""
-                                 |select $scanSchema from parquet.`$parquetDir`
-                                 |
-                                 |""".stripMargin)
-
-    // Get the `FileSourceScanExecTransformer`
-    val chScanPlan = chParquet.queryExecution.executedPlan.collect {
-      case scan: FileSourceScanExecTransformer => scan
-    }
-
-    val chFileScan = chScanPlan.head
-    val outputAttrs = chFileScan.outputAttributes()
-    val filePartitions = chFileScan.getPartitions
-      .take(readFileCnt)
-      .map(_.asInstanceOf[FilePartition])
-
-    val numOutputRows = chFileScan.longMetric("numOutputRows")
-    val numOutputVectors = chFileScan.longMetric("outputVectors")
-    val scanTime = chFileScan.longMetric("scanTime")
-    // Generate Substrait plan
-    val substraitContext = new SubstraitContext
-    val transformContext = chFileScan.transform(substraitContext)
-    val outNames = new java.util.ArrayList[String]()
-    for (attr <- outputAttrs) {
-      outNames.add(ConverterUtils.genColumnNameWithExprId(attr))
-    }
-    val planNode =
-      PlanBuilder.makePlan(substraitContext, 
Lists.newArrayList(transformContext.root), outNames)
-
-    val nativeFileScanRDD = 
BackendsApiManager.getIteratorApiInstance.genNativeFileScanRDD(
-      spark.sparkContext,
-      WholeStageTransformContext(planNode, substraitContext),
-      chFileScan.getSplitInfos,
-      chFileScan,
-      numOutputRows,
-      numOutputVectors,
-      scanTime
-    )
-
-    // Get the total row count
-    val chRowCnt = nativeFileScanRDD
-      .mapPartitionsInternal(batches => batches.map(batch => 
batch.numRows().toLong))
-      .collect()
-      .sum
-
-    val parquetReadBenchmark =
-      new Benchmark(
-        s"Parquet Read $readFileCnt files, fields: $scanSchema, total 
$chRowCnt records",
-        chRowCnt,
-        output = output)
-
-    parquetReadBenchmark.addCase(s"ClickHouse Parquet Read", executedCnt) {
-      _ =>
-        val resultRDD: RDD[Long] = nativeFileScanRDD.mapPartitionsInternal {
-          batches =>
-            batches.map {
-              batch =>
-                val block = CHNativeBlock.fromColumnarBatch(batch)
-                block.totalBytes()
-                block.close()
-                batch.numRows().toLong
-            }
-        }
-        resultRDD.collect()
-    }
-
-    parquetReadBenchmark.addCase(s"ClickHouse Parquet Read to Rows", 
executedCnt) {
-      _ =>
-        val resultRDD: RDD[Long] = nativeFileScanRDD.mapPartitionsInternal {
-          batches =>
-            batches.map {
-              batch =>
-                val block = CHNativeBlock.fromColumnarBatch(batch)
-                val info =
-                  
CHBlockConverterJniWrapper.convertColumnarToRow(block.blockAddress(), null)
-                new Iterator[InternalRow] {
-                  var rowId = 0
-                  val row = new UnsafeRow(batch.numCols())
-                  var closed = false
-
-                  override def hasNext: Boolean = {
-                    val result = rowId < batch.numRows()
-                    if (!result && !closed) {
-                      
CHBlockConverterJniWrapper.freeMemory(info.memoryAddress, info.totalSize)
-                      closed = true
-                    }
-                    result
-                  }
-
-                  override def next: UnsafeRow = {
-                    if (rowId >= batch.numRows()) throw new 
NoSuchElementException
-
-                    val (offset, length) = (info.offsets(rowId), 
info.lengths(rowId))
-                    row.pointTo(null, info.memoryAddress + offset, 
length.toInt)
-                    rowId += 1
-                    row
-                  }
-                }.foreach(_.numFields)
-                block.close()
-
-                batch.numRows().toLong
-            }
-        }
-        resultRDD.collect()
-    }
-
-    if (executedVanilla) {
-      spark.conf.set(GlutenConfig.GLUTEN_ENABLED.key, "false")
-
-      val vanillaParquet = spark.sql(s"""
-                                        |select $scanSchema from 
parquet.`$parquetDir`
-                                        |
-                                        |""".stripMargin)
-
-      val vanillaScanPlan = vanillaParquet.queryExecution.executedPlan.collect 
{
-        case scan: FileSourceScanExec => scan
-      }
-
-      val fileScan = vanillaScanPlan.head
-      val fileScanOutput = fileScan.output
-      val relation = fileScan.relation
-      val readFile: PartitionedFile => Iterator[InternalRow] =
-        relation.fileFormat.buildReaderWithPartitionValues(
-          sparkSession = relation.sparkSession,
-          dataSchema = relation.dataSchema,
-          partitionSchema = relation.partitionSchema,
-          requiredSchema = fileScan.requiredSchema,
-          filters = Seq.empty,
-          options = relation.options,
-          hadoopConf = 
relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options)
-        )
-
-      val newFileScanRDD =
-        SparkShimLoader.getSparkShims
-          .generateFileScanRDD(spark, readFile, filePartitions, fileScan)
-          .asInstanceOf[RDD[ColumnarBatch]]
-
-      val rowCnt = newFileScanRDD
-        .mapPartitionsInternal(batches => batches.map(batch => 
batch.numRows().toLong))
-        .collect()
-        .sum
-      assert(chRowCnt == rowCnt, "The row count of the benchmark is not 
equal.")
-
-      parquetReadBenchmark.addCase(s"Vanilla Spark Parquet Read", executedCnt) 
{
-        _ =>
-          val resultRDD: RDD[Long] = newFileScanRDD.mapPartitionsInternal {
-            batches => batches.map(_.numRows().toLong)
-          }
-          resultRDD.collect()
-      }
-
-      parquetReadBenchmark.addCase(s"Vanilla Spark Parquet Read to Rows", 
executedCnt) {
-        _ =>
-          val resultRDD: RDD[Long] = newFileScanRDD.mapPartitionsInternal {
-            batches =>
-              val toUnsafe = UnsafeProjection.create(fileScanOutput, 
fileScanOutput)
-              batches.map {
-                batch =>
-                  // Convert to row and decode parquet value
-                  
batch.rowIterator().asScala.map(toUnsafe).foreach(_.numFields)
-                  batch.numRows().toLong
-              }
-          }
-          resultRDD.collect()
-      }
-    }
-
-    parquetReadBenchmark.run()
-  }
-}
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
index f1fbf3648..5f9b5afa9 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
@@ -27,9 +27,8 @@ import 
org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 import org.apache.gluten.utils._
 import org.apache.gluten.vectorized._
 
-import org.apache.spark.{SparkConf, SparkContext, TaskContext}
+import org.apache.spark.{SparkConf, TaskContext}
 import org.apache.spark.internal.Logging
-import org.apache.spark.rdd.RDD
 import org.apache.spark.softaffinity.SoftAffinity
 import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
 import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter}
@@ -232,16 +231,4 @@ class VeloxIteratorApi extends IteratorApi with Logging {
       .create()
   }
   // scalastyle:on argcount
-
-  /** Generate Native FileScanRDD, currently only for ClickHouse Backend. */
-  override def genNativeFileScanRDD(
-      sparkContext: SparkContext,
-      wsCxt: WholeStageTransformContext,
-      splitInfos: Seq[SplitInfo],
-      scan: BasicScanExecTransformer,
-      numOutputRows: SQLMetric,
-      numOutputBatches: SQLMetric,
-      scanTime: SQLMetric): RDD[ColumnarBatch] = {
-    throw new UnsupportedOperationException("Cannot support to generate Native 
FileScanRDD.")
-  }
 }
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala 
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
index d999948d7..53dc8f478 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
@@ -24,7 +24,6 @@ import 
org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 import org.apache.gluten.substrait.rel.SplitInfo
 
 import org.apache.spark._
-import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.connector.read.InputPartition
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.types.StructType
@@ -81,14 +80,4 @@ trait IteratorApi {
       partitionIndex: Int,
       materializeInput: Boolean = false): Iterator[ColumnarBatch]
   // scalastyle:on argcount
-
-  /** Generate Native FileScanRDD, currently only for ClickHouse Backend. */
-  def genNativeFileScanRDD(
-      sparkContext: SparkContext,
-      wsCxt: WholeStageTransformContext,
-      splitInfos: Seq[SplitInfo],
-      scan: BasicScanExecTransformer,
-      numOutputRows: SQLMetric,
-      numOutputBatches: SQLMetric,
-      scanTime: SQLMetric): RDD[ColumnarBatch]
 }
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
index 2dd5aff76..af35957ec 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
@@ -22,18 +22,14 @@ import org.apache.gluten.extension.ValidationResult
 import org.apache.gluten.substrait.`type`.ColumnTypeNode
 import org.apache.gluten.substrait.SubstraitContext
 import org.apache.gluten.substrait.extensions.ExtensionBuilder
-import org.apache.gluten.substrait.plan.PlanBuilder
 import org.apache.gluten.substrait.rel.{RelBuilder, SplitInfo}
 import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 
-import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.connector.read.InputPartition
 import org.apache.spark.sql.hive.HiveTableScanExecTransformer
 import org.apache.spark.sql.types.{BooleanType, StringType, StructField, 
StructType}
-import org.apache.spark.sql.vectorized.ColumnarBatch
 
-import com.google.common.collect.Lists
 import com.google.protobuf.StringValue
 
 import scala.collection.JavaConverters._
@@ -75,28 +71,6 @@ trait BasicScanExecTransformer extends LeafTransformSupport 
with BaseDataSource
         .genSplitInfo(_, getPartitionSchema, fileFormat, 
getMetadataColumns.map(_.name)))
   }
 
-  def doExecuteColumnarInternal(): RDD[ColumnarBatch] = {
-    val numOutputRows = longMetric("numOutputRows")
-    val numOutputVectors = longMetric("outputVectors")
-    val scanTime = longMetric("scanTime")
-    val substraitContext = new SubstraitContext
-    val transformContext = transform(substraitContext)
-    val outNames =
-      
filteRedundantField(outputAttributes()).map(ConverterUtils.genColumnNameWithExprId).asJava
-    val planNode =
-      PlanBuilder.makePlan(substraitContext, 
Lists.newArrayList(transformContext.root), outNames)
-
-    BackendsApiManager.getIteratorApiInstance.genNativeFileScanRDD(
-      sparkContext,
-      WholeStageTransformContext(planNode, substraitContext),
-      getSplitInfos,
-      this,
-      numOutputRows,
-      numOutputVectors,
-      scanTime
-    )
-  }
-
   override protected def doValidateInternal(): ValidationResult = {
     var fields = schema.fields
 
@@ -182,9 +156,9 @@ trait BasicScanExecTransformer extends LeafTransformSupport 
with BaseDataSource
   def filteRedundantField(outputs: Seq[Attribute]): Seq[Attribute] = {
     var final_output: List[Attribute] = List()
     val outputList = outputs.toArray
-    for (i <- 0 to outputList.size - 1) {
+    for (i <- outputList.indices) {
       var dup = false
-      for (j <- 0 to i - 1) {
+      for (j <- 0 until i) {
         if (outputList(i).name == outputList(j).name) {
           dup = true
         }
@@ -193,6 +167,6 @@ trait BasicScanExecTransformer extends LeafTransformSupport 
with BaseDataSource
         final_output = final_output :+ outputList(i)
       }
     }
-    final_output.toSeq
+    final_output
   }
 }
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
index b0c8c59e7..64d9d6546 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
@@ -22,7 +22,6 @@ import org.apache.gluten.extension.ValidationResult
 import org.apache.gluten.metrics.MetricsUpdater
 import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 
-import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.QueryPlan
@@ -31,7 +30,6 @@ import org.apache.spark.sql.connector.read.{InputPartition, 
Scan}
 import org.apache.spark.sql.execution.datasources.v2.{BatchScanExecShim, 
FileScan}
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.vectorized.ColumnarBatch
 
 /** Columnar Based BatchScanExec. */
 case class BatchScanExecTransformer(
@@ -144,10 +142,6 @@ abstract class BatchScanExecTransformerBase(
     super.doValidateInternal()
   }
 
-  override def doExecuteColumnar(): RDD[ColumnarBatch] = {
-    doExecuteColumnarInternal()
-  }
-
   override def metricsUpdater(): MetricsUpdater =
     
BackendsApiManager.getMetricsApiInstance.genBatchScanTransformerMetricsUpdater(metrics)
 
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
index c3d2da7f0..ff905251b 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
@@ -21,7 +21,6 @@ import org.apache.gluten.extension.ValidationResult
 import org.apache.gluten.metrics.MetricsUpdater
 import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 
-import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, Expression, PlanExpression}
 import org.apache.spark.sql.catalyst.plans.QueryPlan
@@ -30,7 +29,6 @@ import org.apache.spark.sql.execution.FileSourceScanExecShim
 import org.apache.spark.sql.execution.datasources.HadoopFsRelation
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.vectorized.ColumnarBatch
 import org.apache.spark.util.collection.BitSet
 
 case class FileSourceScanExecTransformer(
@@ -147,10 +145,6 @@ abstract class FileSourceScanExecTransformerBase(
     super.doValidateInternal()
   }
 
-  override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
-    doExecuteColumnarInternal()
-  }
-
   override def metricsUpdater(): MetricsUpdater =
     
BackendsApiManager.getMetricsApiInstance.genFileSourceScanTransformerMetricsUpdater(metrics)
 
diff --git 
a/gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
 
b/gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
index 2952267e5..5dfa85b26 100644
--- 
a/gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
+++ 
b/gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
@@ -22,7 +22,6 @@ import org.apache.gluten.extension.ValidationResult
 import org.apache.gluten.metrics.MetricsUpdater
 import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 
-import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, AttributeSeq, Expression}
@@ -34,7 +33,6 @@ import 
org.apache.spark.sql.hive.HiveTableScanExecTransformer._
 import org.apache.spark.sql.hive.client.HiveClientImpl
 import org.apache.spark.sql.hive.execution.{AbstractHiveTableScanExec, 
HiveTableScanExec}
 import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.vectorized.ColumnarBatch
 import org.apache.spark.util.Utils
 
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
@@ -81,10 +79,6 @@ case class HiveTableScanExecTransformer(
   override def metricsUpdater(): MetricsUpdater =
     
BackendsApiManager.getMetricsApiInstance.genHiveTableScanTransformerMetricsUpdater(metrics)
 
-  override def doExecuteColumnar(): RDD[ColumnarBatch] = {
-    doExecuteColumnarInternal()
-  }
-
   @transient private lazy val hivePartitionConverter =
     new HivePartitionConverter(session.sessionState.newHadoopConf(), session)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to