Copilot commented on code in PR #1995:
URL: https://github.com/apache/auron/pull/1995#discussion_r2792581956


##########
spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHIveTableScanExec.scala:
##########
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive.execution.auron.plan
+
+import org.apache.auron.metric.SparkMetricNode
+import org.apache.auron.{protobuf => pb}
+import org.apache.hadoop.conf.Configurable
+import org.apache.hadoop.hive.ql.exec.Utilities
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
+import org.apache.hadoop.hive.ql.metadata.{Table => HiveTable}
+import org.apache.hadoop.hive.ql.plan.TableDesc
+import org.apache.hadoop.hive.serde.serdeConstants
+import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
+import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, 
StructObjectInspector}
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapred.{FileSplit, InputFormat, JobConf}
+import org.apache.hadoop.mapreduce.{InputFormat => newInputClass}
+import org.apache.hadoop.util.ReflectionUtils
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.HADOOP_RDD_IGNORE_EMPTY_SPLITS
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.auron.{NativeRDD, Shims}
+import org.apache.spark.sql.catalyst.expressions.{AttributeMap, 
GenericInternalRow}
+import org.apache.spark.sql.execution.datasources.{FilePartition, 
PartitionedFile}
+import org.apache.spark.sql.hive.client.HiveClientImpl
+import org.apache.spark.sql.hive.execution.HiveTableScanExec
+import org.apache.spark.sql.hive.{HadoopTableReader, HiveShim}
+import org.apache.spark.{Partition, TaskContext}
+
+import java.util.UUID
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+case class NativeHiveTableScanExec(basedHiveScan: HiveTableScanExec)
+  extends NativeHiveTableScanBase(basedHiveScan)
+    with Logging {
+
+  @transient private lazy val nativeTable: HiveTable = 
HiveClientImpl.toHiveTable(relation.tableMeta)
+  @transient private lazy val fileFormat = 
HiveTableUtil.getFileFormat(nativeTable.getInputFormatClass)
+  @transient private lazy val nativeTableDesc = new TableDesc(
+    nativeTable.getInputFormatClass,
+    nativeTable.getOutputFormatClass,
+    nativeTable.getMetadata)
+
+  @transient private lazy val nativeHadoopConf = {
+    val hiveConf = 
SparkSession.getActiveSession.get.sessionState.newHadoopConf()

Review Comment:
   `nativeHadoopConf` uses `SparkSession.getActiveSession.get`, which can throw 
if there is no active session (e.g., execution triggered outside a SQL 
context). Consider using the same session derivation as 
`NativeHiveTableScanBase.broadcastedHadoopConf` 
(`Shims.get.getSqlContext(basedHiveScan).sparkSession`) to avoid runtime 
failures.
   ```suggestion
       val sparkSession = Shims.get.getSqlContext(basedHiveScan).sparkSession
       val hiveConf = sparkSession.sessionState.newHadoopConf()
   ```



##########
spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/HiveConvertProvider.scala:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution.auron.plan
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.auron.AuronConverters.getBooleanConf
+import org.apache.spark.sql.auron.{AuronConvertProvider, AuronConverters}
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.hive.execution.HiveTableScanExec
+
+class HiveConvertProvider extends AuronConvertProvider with Logging {
+  override def isEnabled: Boolean =
+    getBooleanConf("spark.auron.enable.hiveTable", defaultValue = true)
+
+  def enableHiveTableScanExec: Boolean =
+    getBooleanConf("spark.auron.enable.hiveTableScanExec", defaultValue = 
false)
+
+  override def isSupported(exec: SparkPlan): Boolean =
+    exec match {
+      case e: HiveTableScanExec if enableHiveTableScanExec &&
+        e.relation.tableMeta.provider.isDefined &&
+        e.relation.tableMeta.provider.get.equals("hive") =>
+        true

Review Comment:
   `isSupported` accepts all Hive tables with provider == "hive", but 
`NativeHiveTableScanExec` only builds native nodes for ORC/Parquet and 
otherwise will throw (e.g., `MatchError` on file format). Add an explicit 
format check here (or make the native exec gracefully fall back) to avoid 
runtime failures on non-ORC/Parquet Hive tables.



##########
spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHIveTableScanExec.scala:
##########
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive.execution.auron.plan
+
+import org.apache.auron.metric.SparkMetricNode
+import org.apache.auron.{protobuf => pb}
+import org.apache.hadoop.conf.Configurable
+import org.apache.hadoop.hive.ql.exec.Utilities
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
+import org.apache.hadoop.hive.ql.metadata.{Table => HiveTable}
+import org.apache.hadoop.hive.ql.plan.TableDesc
+import org.apache.hadoop.hive.serde.serdeConstants
+import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
+import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, 
StructObjectInspector}
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapred.{FileSplit, InputFormat, JobConf}
+import org.apache.hadoop.mapreduce.{InputFormat => newInputClass}
+import org.apache.hadoop.util.ReflectionUtils
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.HADOOP_RDD_IGNORE_EMPTY_SPLITS
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.auron.{NativeRDD, Shims}
+import org.apache.spark.sql.catalyst.expressions.{AttributeMap, 
GenericInternalRow}
+import org.apache.spark.sql.execution.datasources.{FilePartition, 
PartitionedFile}
+import org.apache.spark.sql.hive.client.HiveClientImpl
+import org.apache.spark.sql.hive.execution.HiveTableScanExec
+import org.apache.spark.sql.hive.{HadoopTableReader, HiveShim}
+import org.apache.spark.{Partition, TaskContext}
+
+import java.util.UUID
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+case class NativeHiveTableScanExec(basedHiveScan: HiveTableScanExec)
+  extends NativeHiveTableScanBase(basedHiveScan)
+    with Logging {
+
+  @transient private lazy val nativeTable: HiveTable = 
HiveClientImpl.toHiveTable(relation.tableMeta)
+  @transient private lazy val fileFormat = 
HiveTableUtil.getFileFormat(nativeTable.getInputFormatClass)
+  @transient private lazy val nativeTableDesc = new TableDesc(
+    nativeTable.getInputFormatClass,
+    nativeTable.getOutputFormatClass,
+    nativeTable.getMetadata)
+
+  @transient private lazy val nativeHadoopConf = {
+    val hiveConf = 
SparkSession.getActiveSession.get.sessionState.newHadoopConf()
+    // append columns ids and names before broadcast
+    val columnOrdinals = AttributeMap(relation.dataCols.zipWithIndex)
+    val neededColumnIDs = output.flatMap(columnOrdinals.get).map(o => o: 
Integer)
+    val neededColumnNames = output.filter(columnOrdinals.contains).map(_.name)
+
+    HiveShim.appendReadColumns(hiveConf, neededColumnIDs, neededColumnNames)
+
+    val deserializer = 
nativeTableDesc.getDeserializerClass.getConstructor().newInstance()
+    deserializer.initialize(hiveConf, nativeTableDesc.getProperties)
+
+    // Specifies types and object inspectors of columns to be scanned.
+    val structOI = ObjectInspectorUtils
+      .getStandardObjectInspector(
+        deserializer.getObjectInspector,
+        ObjectInspectorCopyOption.JAVA)
+      .asInstanceOf[StructObjectInspector]
+
+    val columnTypeNames = structOI
+      .getAllStructFieldRefs.asScala
+      .map(_.getFieldObjectInspector)
+      .map(TypeInfoUtils.getTypeInfoFromObjectInspector(_).getTypeName)
+      .mkString(",")
+
+    hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames)
+    hiveConf.set(serdeConstants.LIST_COLUMNS, 
relation.dataCols.map(_.name).mkString(","))
+    hiveConf
+  }
+
+  private val minPartitions = if 
(SparkSession.getActiveSession.get.sparkContext.isLocal) {
+    0 // will splitted based on block by default.
+  } else {
+    math.max(nativeHadoopConf.getInt("mapreduce.job.maps", 1),
+      SparkSession.getActiveSession.get.sparkContext.defaultMinPartitions)
+  }
+
+  private val ignoreEmptySplits =
+    
SparkSession.getActiveSession.get.sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS)

Review Comment:
   `ignoreEmptySplits` also depends on `SparkSession.getActiveSession.get`. 
This should use the same non-optional session/context resolution as the rest of 
the execution code to avoid `NoSuchElementException` when there is no active 
session.
   ```suggestion
     private val minPartitions = if (sparkContext.isLocal) {
       0 // will splitted based on block by default.
     } else {
       math.max(nativeHadoopConf.getInt("mapreduce.job.maps", 1),
         sparkContext.defaultMinPartitions)
     }
   
     private val ignoreEmptySplits =
       sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS)
   ```



##########
spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHIveTableScanExec.scala:
##########
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive.execution.auron.plan
+
+import org.apache.auron.metric.SparkMetricNode
+import org.apache.auron.{protobuf => pb}
+import org.apache.hadoop.conf.Configurable
+import org.apache.hadoop.hive.ql.exec.Utilities
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
+import org.apache.hadoop.hive.ql.metadata.{Table => HiveTable}
+import org.apache.hadoop.hive.ql.plan.TableDesc
+import org.apache.hadoop.hive.serde.serdeConstants
+import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
+import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, 
StructObjectInspector}
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapred.{FileSplit, InputFormat, JobConf}
+import org.apache.hadoop.mapreduce.{InputFormat => newInputClass}
+import org.apache.hadoop.util.ReflectionUtils
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.HADOOP_RDD_IGNORE_EMPTY_SPLITS
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.auron.{NativeRDD, Shims}
+import org.apache.spark.sql.catalyst.expressions.{AttributeMap, 
GenericInternalRow}
+import org.apache.spark.sql.execution.datasources.{FilePartition, 
PartitionedFile}
+import org.apache.spark.sql.hive.client.HiveClientImpl
+import org.apache.spark.sql.hive.execution.HiveTableScanExec
+import org.apache.spark.sql.hive.{HadoopTableReader, HiveShim}
+import org.apache.spark.{Partition, TaskContext}
+
+import java.util.UUID
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+case class NativeHiveTableScanExec(basedHiveScan: HiveTableScanExec)
+  extends NativeHiveTableScanBase(basedHiveScan)
+    with Logging {
+
+  @transient private lazy val nativeTable: HiveTable = 
HiveClientImpl.toHiveTable(relation.tableMeta)
+  @transient private lazy val fileFormat = 
HiveTableUtil.getFileFormat(nativeTable.getInputFormatClass)
+  @transient private lazy val nativeTableDesc = new TableDesc(
+    nativeTable.getInputFormatClass,
+    nativeTable.getOutputFormatClass,
+    nativeTable.getMetadata)
+
+  @transient private lazy val nativeHadoopConf = {
+    val hiveConf = 
SparkSession.getActiveSession.get.sessionState.newHadoopConf()
+    // append columns ids and names before broadcast
+    val columnOrdinals = AttributeMap(relation.dataCols.zipWithIndex)
+    val neededColumnIDs = output.flatMap(columnOrdinals.get).map(o => o: 
Integer)
+    val neededColumnNames = output.filter(columnOrdinals.contains).map(_.name)
+
+    HiveShim.appendReadColumns(hiveConf, neededColumnIDs, neededColumnNames)
+
+    val deserializer = 
nativeTableDesc.getDeserializerClass.getConstructor().newInstance()
+    deserializer.initialize(hiveConf, nativeTableDesc.getProperties)
+
+    // Specifies types and object inspectors of columns to be scanned.
+    val structOI = ObjectInspectorUtils
+      .getStandardObjectInspector(
+        deserializer.getObjectInspector,
+        ObjectInspectorCopyOption.JAVA)
+      .asInstanceOf[StructObjectInspector]
+
+    val columnTypeNames = structOI
+      .getAllStructFieldRefs.asScala
+      .map(_.getFieldObjectInspector)
+      .map(TypeInfoUtils.getTypeInfoFromObjectInspector(_).getTypeName)
+      .mkString(",")
+
+    hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames)
+    hiveConf.set(serdeConstants.LIST_COLUMNS, 
relation.dataCols.map(_.name).mkString(","))
+    hiveConf
+  }
+
+  private val minPartitions = if 
(SparkSession.getActiveSession.get.sparkContext.isLocal) {
+    0 // will splitted based on block by default.
+  } else {
+    math.max(nativeHadoopConf.getInt("mapreduce.job.maps", 1),
+      SparkSession.getActiveSession.get.sparkContext.defaultMinPartitions)
+  }
+
+  private val ignoreEmptySplits =
+    
SparkSession.getActiveSession.get.sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS)
+
+  override val nodeName: String =
+    s"NativeHiveTableScan $tableName"
+
+  override def doExecuteNative(): NativeRDD = {
+    val nativeMetrics = SparkMetricNode(
+      metrics,
+      Nil,
+      Some({
+        case ("bytes_scanned", v) =>
+          val inputMetric = TaskContext.get.taskMetrics().inputMetrics
+          inputMetric.incBytesRead(v)
+        case ("output_rows", v) =>
+          val inputMetric = TaskContext.get.taskMetrics().inputMetrics
+          inputMetric.incRecordsRead(v)
+        case _ =>
+      }))
+    val nativeFileSchema = this.nativeFileSchema
+    val nativeFileGroups = this.nativeFileGroups
+    val nativePartitionSchema = this.nativePartitionSchema
+
+    val projection = schema.map(field => 
relation.schema.fieldIndex(field.name))
+    val broadcastedHadoopConf = this.broadcastedHadoopConf
+    val numPartitions = partitions.length
+
+    new NativeRDD(
+      sparkContext,
+      nativeMetrics,
+      partitions.asInstanceOf[Array[Partition]],
+      None,
+      Nil,
+      rddShuffleReadFull = true,
+      (partition, _) => {
+        val resourceId = s"NativeHiveTableScan:${UUID.randomUUID().toString}"
+        putJniBridgeResource(resourceId, broadcastedHadoopConf)
+
+        val nativeFileGroup = 
nativeFileGroups(partition.asInstanceOf[FilePartition])
+        val nativeFileScanConf = pb.FileScanExecConf
+          .newBuilder()
+          .setNumPartitions(numPartitions)
+          .setPartitionIndex(partition.index)
+          .setStatistics(pb.Statistics.getDefaultInstance)
+          .setSchema(nativeFileSchema)
+          .setFileGroup(nativeFileGroup)
+          .addAllProjection(projection.map(Integer.valueOf).asJava)
+          .setPartitionSchema(nativePartitionSchema)
+          .build()
+        fileFormat match {
+          case "orc" =>
+            val nativeOrcScanExecBuilder = pb.OrcScanExecNode
+              .newBuilder()
+              .setBaseConf(nativeFileScanConf)
+              .setFsResourceId(resourceId)
+              .addAllPruningPredicates(new java.util.ArrayList()) // not 
support this filter

Review Comment:
   `fileFormat match` only has cases for "orc" and "parquet". Since 
`HiveTableUtil.getFileFormat` can return "other", this can throw a `MatchError` 
at runtime. Add a default case that either throws a clear unsupported-format 
error or triggers a non-native fallback.



##########
spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHIveTableScanExec.scala:
##########
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive.execution.auron.plan
+
+import org.apache.auron.metric.SparkMetricNode
+import org.apache.auron.{protobuf => pb}
+import org.apache.hadoop.conf.Configurable
+import org.apache.hadoop.hive.ql.exec.Utilities
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
+import org.apache.hadoop.hive.ql.metadata.{Table => HiveTable}
+import org.apache.hadoop.hive.ql.plan.TableDesc
+import org.apache.hadoop.hive.serde.serdeConstants
+import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
+import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, 
StructObjectInspector}
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapred.{FileSplit, InputFormat, JobConf}
+import org.apache.hadoop.mapreduce.{InputFormat => newInputClass}
+import org.apache.hadoop.util.ReflectionUtils
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.HADOOP_RDD_IGNORE_EMPTY_SPLITS
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.auron.{NativeRDD, Shims}
+import org.apache.spark.sql.catalyst.expressions.{AttributeMap, 
GenericInternalRow}
+import org.apache.spark.sql.execution.datasources.{FilePartition, 
PartitionedFile}
+import org.apache.spark.sql.hive.client.HiveClientImpl
+import org.apache.spark.sql.hive.execution.HiveTableScanExec
+import org.apache.spark.sql.hive.{HadoopTableReader, HiveShim}
+import org.apache.spark.{Partition, TaskContext}
+
+import java.util.UUID
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+case class NativeHiveTableScanExec(basedHiveScan: HiveTableScanExec)
+  extends NativeHiveTableScanBase(basedHiveScan)
+    with Logging {
+
+  @transient private lazy val nativeTable: HiveTable = 
HiveClientImpl.toHiveTable(relation.tableMeta)
+  @transient private lazy val fileFormat = 
HiveTableUtil.getFileFormat(nativeTable.getInputFormatClass)
+  @transient private lazy val nativeTableDesc = new TableDesc(
+    nativeTable.getInputFormatClass,
+    nativeTable.getOutputFormatClass,
+    nativeTable.getMetadata)
+
+  @transient private lazy val nativeHadoopConf = {
+    val hiveConf = 
SparkSession.getActiveSession.get.sessionState.newHadoopConf()
+    // append columns ids and names before broadcast
+    val columnOrdinals = AttributeMap(relation.dataCols.zipWithIndex)
+    val neededColumnIDs = output.flatMap(columnOrdinals.get).map(o => o: 
Integer)
+    val neededColumnNames = output.filter(columnOrdinals.contains).map(_.name)
+
+    HiveShim.appendReadColumns(hiveConf, neededColumnIDs, neededColumnNames)
+
+    val deserializer = 
nativeTableDesc.getDeserializerClass.getConstructor().newInstance()
+    deserializer.initialize(hiveConf, nativeTableDesc.getProperties)
+
+    // Specifies types and object inspectors of columns to be scanned.
+    val structOI = ObjectInspectorUtils
+      .getStandardObjectInspector(
+        deserializer.getObjectInspector,
+        ObjectInspectorCopyOption.JAVA)
+      .asInstanceOf[StructObjectInspector]
+
+    val columnTypeNames = structOI
+      .getAllStructFieldRefs.asScala
+      .map(_.getFieldObjectInspector)
+      .map(TypeInfoUtils.getTypeInfoFromObjectInspector(_).getTypeName)
+      .mkString(",")
+
+    hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames)
+    hiveConf.set(serdeConstants.LIST_COLUMNS, 
relation.dataCols.map(_.name).mkString(","))
+    hiveConf
+  }
+
+  private val minPartitions = if 
(SparkSession.getActiveSession.get.sparkContext.isLocal) {
+    0 // will splitted based on block by default.
+  } else {
+    math.max(nativeHadoopConf.getInt("mapreduce.job.maps", 1),
+      SparkSession.getActiveSession.get.sparkContext.defaultMinPartitions)
+  }
+
+  private val ignoreEmptySplits =
+    
SparkSession.getActiveSession.get.sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS)
+
+  override val nodeName: String =
+    s"NativeHiveTableScan $tableName"
+
+  override def doExecuteNative(): NativeRDD = {
+    val nativeMetrics = SparkMetricNode(
+      metrics,
+      Nil,
+      Some({
+        case ("bytes_scanned", v) =>
+          val inputMetric = TaskContext.get.taskMetrics().inputMetrics
+          inputMetric.incBytesRead(v)
+        case ("output_rows", v) =>
+          val inputMetric = TaskContext.get.taskMetrics().inputMetrics
+          inputMetric.incRecordsRead(v)
+        case _ =>
+      }))
+    val nativeFileSchema = this.nativeFileSchema
+    val nativeFileGroups = this.nativeFileGroups
+    val nativePartitionSchema = this.nativePartitionSchema
+
+    val projection = schema.map(field => 
relation.schema.fieldIndex(field.name))
+    val broadcastedHadoopConf = this.broadcastedHadoopConf
+    val numPartitions = partitions.length
+
+    new NativeRDD(
+      sparkContext,
+      nativeMetrics,
+      partitions.asInstanceOf[Array[Partition]],
+      None,
+      Nil,
+      rddShuffleReadFull = true,
+      (partition, _) => {
+        val resourceId = s"NativeHiveTableScan:${UUID.randomUUID().toString}"
+        putJniBridgeResource(resourceId, broadcastedHadoopConf)
+
+        val nativeFileGroup = 
nativeFileGroups(partition.asInstanceOf[FilePartition])
+        val nativeFileScanConf = pb.FileScanExecConf
+          .newBuilder()
+          .setNumPartitions(numPartitions)
+          .setPartitionIndex(partition.index)
+          .setStatistics(pb.Statistics.getDefaultInstance)
+          .setSchema(nativeFileSchema)
+          .setFileGroup(nativeFileGroup)
+          .addAllProjection(projection.map(Integer.valueOf).asJava)
+          .setPartitionSchema(nativePartitionSchema)
+          .build()
+        fileFormat match {
+          case "orc" =>
+            val nativeOrcScanExecBuilder = pb.OrcScanExecNode
+              .newBuilder()
+              .setBaseConf(nativeFileScanConf)
+              .setFsResourceId(resourceId)
+              .addAllPruningPredicates(new java.util.ArrayList()) // not 
support this filter
+            pb.PhysicalPlanNode
+              .newBuilder()
+              .setOrcScan(nativeOrcScanExecBuilder.build())
+              .build()
+          case "parquet" =>
+            val nativeParquetScanExecBuilder = pb.ParquetScanExecNode
+              .newBuilder()
+              .setBaseConf(nativeFileScanConf)
+              .setFsResourceId(resourceId)
+              .addAllPruningPredicates(new java.util.ArrayList()) // not 
support this filter
+
+            pb.PhysicalPlanNode
+              .newBuilder()
+              .setParquetScan(nativeParquetScanExecBuilder.build())
+              .build()
+        }
+      },
+      friendlyName = "NativeRDD.HiveTableScan")
+  }
+
+  override def getFilePartitions(): Array[FilePartition] = {
+    val newJobConf = new JobConf(nativeHadoopConf)
+    val arrayFilePartition = ArrayBuffer[FilePartition]()
+    val partitionedFiles = if (relation.isPartitioned) {
+      val partitions = basedHiveScan.prunedPartitions
+      val arrayPartitionedFile = ArrayBuffer[PartitionedFile]()
+      partitions.foreach { partition =>
+        val partDesc = 
Utilities.getPartitionDescFromTableDesc(nativeTableDesc, partition, true)
+        val partPath = partition.getDataLocation
+        HadoopTableReader.initializeLocalJobConfFunc(partPath.toString, 
nativeTableDesc)(newJobConf)
+        val partitionValues = partition.getTPartition.getValues
+
+        val partitionInternalRow = new 
GenericInternalRow(partitionValues.size())
+        for (partitionIndex <- 0 until partitionValues.size) {
+          partitionInternalRow.update(partitionIndex, 
partitionValues.get(partitionIndex))
+        }
+
+        val inputFormatClass = partDesc.getInputFileFormatClass
+          .asInstanceOf[Class[newInputClass[Writable, Writable]]]
+        arrayPartitionedFile += getArrayPartitionedFile(newJobConf, 
inputFormatClass, partitionInternalRow)
+      }
+      arrayPartitionedFile
+        .sortBy(_.length)(implicitly[Ordering[Long]].reverse)
+        .toArray
+    } else {
+      val inputFormatClass = 
nativeTable.getInputFormatClass.asInstanceOf[Class[newInputClass[Writable, 
Writable]]]
+      getArrayPartitionedFile(newJobConf, inputFormatClass, new 
GenericInternalRow(0))
+        .sortBy(_.length)(implicitly[Ordering[Long]].reverse)
+        .toArray
+    }
+    arrayFilePartition += 
FilePartition.getFilePartitions(SparkSession.getActiveSession.get,
+      partitionedFiles,
+      getMaxSplitBytes(SparkSession.getActiveSession.get)).toArray
+    arrayFilePartition.toArray
+  }
+
+  private def getMaxSplitBytes(sparkSession: SparkSession): Long = {
+    val defaultMaxSplitBytes = 
sparkSession.sessionState.conf.filesMaxPartitionBytes
+    val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
+    Math.min(defaultMaxSplitBytes, openCostInBytes)
+  }
+
+  private def getArrayPartitionedFile(newJobConf: JobConf,
+                               inputFormatClass: Class[newInputClass[Writable, 
Writable]],
+                               partitionInternalRow: GenericInternalRow): 
ArrayBuffer[PartitionedFile] = {
+    val allInputSplits = getInputFormat(newJobConf, 
inputFormatClass).getSplits(newJobConf, minPartitions)
+    val inputSplits = if (ignoreEmptySplits) {
+      allInputSplits.filter(_.getLength > 0)
+    } else {
+      allInputSplits
+    }
+    inputFormatClass match {
+      case OrcInputFormat =>
+      case MapredParquetInputFormat =>
+      case _ =>
+    }
+    val arrayFilePartition = ArrayBuffer[PartitionedFile]()
+    for (i <- 0 until inputSplits.size) {
+      val inputSplit = inputSplits(i)
+      inputSplit match {
+        case FileSplit =>
+          val orcInputSplit = inputSplit.asInstanceOf[FileSplit]

Review Comment:
   `inputSplit match { case FileSplit => ... }` won’t work: `FileSplit` is a 
class, not a matchable singleton. Use a typed pattern like `case fs: FileSplit 
=>` and avoid re-casting the same value immediately after matching.
   ```suggestion
           case orcInputSplit: FileSplit =>
   ```



##########
spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHIveTableScanExec.scala:
##########
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive.execution.auron.plan
+
+import org.apache.auron.metric.SparkMetricNode
+import org.apache.auron.{protobuf => pb}
+import org.apache.hadoop.conf.Configurable
+import org.apache.hadoop.hive.ql.exec.Utilities
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
+import org.apache.hadoop.hive.ql.metadata.{Table => HiveTable}
+import org.apache.hadoop.hive.ql.plan.TableDesc
+import org.apache.hadoop.hive.serde.serdeConstants
+import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
+import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, 
StructObjectInspector}
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapred.{FileSplit, InputFormat, JobConf}
+import org.apache.hadoop.mapreduce.{InputFormat => newInputClass}
+import org.apache.hadoop.util.ReflectionUtils
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.HADOOP_RDD_IGNORE_EMPTY_SPLITS
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.auron.{NativeRDD, Shims}
+import org.apache.spark.sql.catalyst.expressions.{AttributeMap, 
GenericInternalRow}
+import org.apache.spark.sql.execution.datasources.{FilePartition, 
PartitionedFile}
+import org.apache.spark.sql.hive.client.HiveClientImpl
+import org.apache.spark.sql.hive.execution.HiveTableScanExec
+import org.apache.spark.sql.hive.{HadoopTableReader, HiveShim}
+import org.apache.spark.{Partition, TaskContext}
+
+import java.util.UUID
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+case class NativeHiveTableScanExec(basedHiveScan: HiveTableScanExec)
+  extends NativeHiveTableScanBase(basedHiveScan)
+    with Logging {
+
+  @transient private lazy val nativeTable: HiveTable = 
HiveClientImpl.toHiveTable(relation.tableMeta)
+  @transient private lazy val fileFormat = 
HiveTableUtil.getFileFormat(nativeTable.getInputFormatClass)
+  @transient private lazy val nativeTableDesc = new TableDesc(
+    nativeTable.getInputFormatClass,
+    nativeTable.getOutputFormatClass,
+    nativeTable.getMetadata)
+
+  @transient private lazy val nativeHadoopConf = {
+    val hiveConf = 
SparkSession.getActiveSession.get.sessionState.newHadoopConf()
+    // append columns ids and names before broadcast
+    val columnOrdinals = AttributeMap(relation.dataCols.zipWithIndex)
+    val neededColumnIDs = output.flatMap(columnOrdinals.get).map(o => o: 
Integer)
+    val neededColumnNames = output.filter(columnOrdinals.contains).map(_.name)
+
+    HiveShim.appendReadColumns(hiveConf, neededColumnIDs, neededColumnNames)
+
+    val deserializer = 
nativeTableDesc.getDeserializerClass.getConstructor().newInstance()
+    deserializer.initialize(hiveConf, nativeTableDesc.getProperties)
+
+    // Specifies types and object inspectors of columns to be scanned.
+    val structOI = ObjectInspectorUtils
+      .getStandardObjectInspector(
+        deserializer.getObjectInspector,
+        ObjectInspectorCopyOption.JAVA)
+      .asInstanceOf[StructObjectInspector]
+
+    val columnTypeNames = structOI
+      .getAllStructFieldRefs.asScala
+      .map(_.getFieldObjectInspector)
+      .map(TypeInfoUtils.getTypeInfoFromObjectInspector(_).getTypeName)
+      .mkString(",")
+
+    hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames)
+    hiveConf.set(serdeConstants.LIST_COLUMNS, 
relation.dataCols.map(_.name).mkString(","))
+    hiveConf
+  }
+
+  private val minPartitions = if 
(SparkSession.getActiveSession.get.sparkContext.isLocal) {
+    0 // will splitted based on block by default.
+  } else {
+    math.max(nativeHadoopConf.getInt("mapreduce.job.maps", 1),
+      SparkSession.getActiveSession.get.sparkContext.defaultMinPartitions)
+  }
+
+  private val ignoreEmptySplits =
+    
SparkSession.getActiveSession.get.sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS)
+
+  override val nodeName: String =
+    s"NativeHiveTableScan $tableName"
+
+  override def doExecuteNative(): NativeRDD = {
+    val nativeMetrics = SparkMetricNode(
+      metrics,
+      Nil,
+      Some({
+        case ("bytes_scanned", v) =>
+          val inputMetric = TaskContext.get.taskMetrics().inputMetrics
+          inputMetric.incBytesRead(v)
+        case ("output_rows", v) =>
+          val inputMetric = TaskContext.get.taskMetrics().inputMetrics
+          inputMetric.incRecordsRead(v)
+        case _ =>
+      }))
+    val nativeFileSchema = this.nativeFileSchema
+    val nativeFileGroups = this.nativeFileGroups
+    val nativePartitionSchema = this.nativePartitionSchema
+
+    val projection = schema.map(field => 
relation.schema.fieldIndex(field.name))
+    val broadcastedHadoopConf = this.broadcastedHadoopConf
+    val numPartitions = partitions.length
+
+    new NativeRDD(
+      sparkContext,
+      nativeMetrics,
+      partitions.asInstanceOf[Array[Partition]],
+      None,
+      Nil,
+      rddShuffleReadFull = true,
+      (partition, _) => {
+        val resourceId = s"NativeHiveTableScan:${UUID.randomUUID().toString}"
+        putJniBridgeResource(resourceId, broadcastedHadoopConf)
+
+        val nativeFileGroup = 
nativeFileGroups(partition.asInstanceOf[FilePartition])
+        val nativeFileScanConf = pb.FileScanExecConf
+          .newBuilder()
+          .setNumPartitions(numPartitions)
+          .setPartitionIndex(partition.index)
+          .setStatistics(pb.Statistics.getDefaultInstance)
+          .setSchema(nativeFileSchema)
+          .setFileGroup(nativeFileGroup)
+          .addAllProjection(projection.map(Integer.valueOf).asJava)
+          .setPartitionSchema(nativePartitionSchema)
+          .build()
+        fileFormat match {
+          case "orc" =>
+            val nativeOrcScanExecBuilder = pb.OrcScanExecNode
+              .newBuilder()
+              .setBaseConf(nativeFileScanConf)
+              .setFsResourceId(resourceId)
+              .addAllPruningPredicates(new java.util.ArrayList()) // not 
support this filter
+            pb.PhysicalPlanNode
+              .newBuilder()
+              .setOrcScan(nativeOrcScanExecBuilder.build())
+              .build()
+          case "parquet" =>
+            val nativeParquetScanExecBuilder = pb.ParquetScanExecNode
+              .newBuilder()
+              .setBaseConf(nativeFileScanConf)
+              .setFsResourceId(resourceId)
+              .addAllPruningPredicates(new java.util.ArrayList()) // not 
support this filter
+
+            pb.PhysicalPlanNode
+              .newBuilder()
+              .setParquetScan(nativeParquetScanExecBuilder.build())
+              .build()
+        }
+      },
+      friendlyName = "NativeRDD.HiveTableScan")
+  }
+
+  override def getFilePartitions(): Array[FilePartition] = {
+    val newJobConf = new JobConf(nativeHadoopConf)
+    val arrayFilePartition = ArrayBuffer[FilePartition]()
+    val partitionedFiles = if (relation.isPartitioned) {
+      val partitions = basedHiveScan.prunedPartitions
+      val arrayPartitionedFile = ArrayBuffer[PartitionedFile]()
+      partitions.foreach { partition =>
+        val partDesc = 
Utilities.getPartitionDescFromTableDesc(nativeTableDesc, partition, true)
+        val partPath = partition.getDataLocation
+        HadoopTableReader.initializeLocalJobConfFunc(partPath.toString, 
nativeTableDesc)(newJobConf)
+        val partitionValues = partition.getTPartition.getValues
+
+        val partitionInternalRow = new 
GenericInternalRow(partitionValues.size())
+        for (partitionIndex <- 0 until partitionValues.size) {
+          partitionInternalRow.update(partitionIndex, 
partitionValues.get(partitionIndex))
+        }
+
+        val inputFormatClass = partDesc.getInputFileFormatClass
+          .asInstanceOf[Class[newInputClass[Writable, Writable]]]
+        arrayPartitionedFile += getArrayPartitionedFile(newJobConf, 
inputFormatClass, partitionInternalRow)
+      }
+      arrayPartitionedFile
+        .sortBy(_.length)(implicitly[Ordering[Long]].reverse)
+        .toArray
+    } else {
+      val inputFormatClass = 
nativeTable.getInputFormatClass.asInstanceOf[Class[newInputClass[Writable, 
Writable]]]
+      getArrayPartitionedFile(newJobConf, inputFormatClass, new 
GenericInternalRow(0))
+        .sortBy(_.length)(implicitly[Ordering[Long]].reverse)
+        .toArray
+    }
+    arrayFilePartition += 
FilePartition.getFilePartitions(SparkSession.getActiveSession.get,
+      partitionedFiles,
+      getMaxSplitBytes(SparkSession.getActiveSession.get)).toArray
+    arrayFilePartition.toArray
+  }
+
+  private def getMaxSplitBytes(sparkSession: SparkSession): Long = {
+    val defaultMaxSplitBytes = 
sparkSession.sessionState.conf.filesMaxPartitionBytes
+    val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
+    Math.min(defaultMaxSplitBytes, openCostInBytes)

Review Comment:
   `getMaxSplitBytes` currently returns `min(filesMaxPartitionBytes, 
filesOpenCostInBytes)`, which can drastically shrink splits and create 
excessive partitions. Elsewhere in this repo (`NativePaimonTableScanExec`) you 
fork Spark’s `FilePartition#maxSplitBytes` logic using 
`min(defaultMaxSplitBytes, max(openCostInBytes, bytesPerCore))`. Align this 
implementation to that logic (or call the shared helper) to avoid performance 
regressions.
   ```suggestion
         getMaxSplitBytes(SparkSession.getActiveSession.get, 
partitionedFiles)).toArray
       arrayFilePartition.toArray
     }
   
     private def getMaxSplitBytes(
         sparkSession: SparkSession,
         partitionedFiles: Seq[PartitionedFile]): Long = {
       val defaultMaxSplitBytes = 
sparkSession.sessionState.conf.filesMaxPartitionBytes
       val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
       val totalBytes = partitionedFiles.map(_.length).sum
       val parallelism = math.max(1, 
sparkSession.sparkContext.defaultParallelism)
       val bytesPerCore = if (totalBytes <= 0L) openCostInBytes else totalBytes 
/ parallelism
       Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
   ```



##########
spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala:
##########
@@ -1048,6 +1015,11 @@ object AuronConverters extends Logging {
     convertToNative(exec)
   }
 
+  def convertHiveTableScanExec(exec: HiveTableScanExec): SparkPlan = {
+    logDebugPlanConversion(exec)
+

Review Comment:
   `convertHiveTableScanExec` has return type `SparkPlan` but currently has no 
implementation/return value. As written it won’t compile; either implement the 
conversion (or delegate to the external provider) or remove the unused stub 
until it’s ready.
   ```suggestion
       convertToNative(exec)
   ```



##########
spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala:
##########
@@ -16,69 +16,39 @@
  */
 package org.apache.spark.sql.auron
 
-import java.util.ServiceLoader
-
-import scala.annotation.tailrec
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-
+import org.apache.auron.configuration.AuronConfiguration
+import org.apache.auron.jni.AuronAdaptor
+import org.apache.auron.metric.SparkMetricNode
+import org.apache.auron.protobuf.{EmptyPartitionsExecNode, PhysicalPlanNode}
+import org.apache.auron.spark.configuration.SparkAuronConfiguration
+import org.apache.auron.sparkver
 import org.apache.commons.lang3.reflect.MethodUtils
 import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat
 import org.apache.spark.Partition
 import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.internal.{config, Logging}
-import 
org.apache.spark.sql.auron.AuronConvertStrategy.{childOrderingRequiredTag, 
convertibleTag, convertStrategyTag, convertToNonNativeTag, isNeverConvert, 
joinSmallerSideTag, neverConvertReasonTag}
-import org.apache.spark.sql.auron.NativeConverters.{existTimestampType, 
isTypeSupported, roundRobinTypeSupported, StubExpr}
+import org.apache.spark.internal.{Logging, config}
+import org.apache.spark.sql.auron.AuronConvertStrategy._
+import org.apache.spark.sql.auron.NativeConverters.{StubExpr, 
existTimestampType, isTypeSupported, roundRobinTypeSupported}
 import org.apache.spark.sql.auron.join.JoinBuildSides.{JoinBuildLeft, 
JoinBuildRight, JoinBuildSide}
 import org.apache.spark.sql.auron.util.AuronLogUtils.logDebugPlanConversion
-import org.apache.spark.sql.catalyst.expressions.AggregateWindowFunction
-import org.apache.spark.sql.catalyst.expressions.Alias
-import org.apache.spark.sql.catalyst.expressions.Ascending
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.catalyst.expressions.Literal
-import org.apache.spark.sql.catalyst.expressions.NamedExpression
-import org.apache.spark.sql.catalyst.expressions.SortOrder
-import org.apache.spark.sql.catalyst.expressions.WindowExpression
-import org.apache.spark.sql.catalyst.expressions.WindowSpecDefinition
-import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
-import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction
-import org.apache.spark.sql.catalyst.expressions.aggregate.Final
-import org.apache.spark.sql.catalyst.expressions.aggregate.Partial
-import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
-import org.apache.spark.sql.catalyst.plans.physical.Partitioning
-import org.apache.spark.sql.catalyst.plans.physical.RangePartitioning
-import org.apache.spark.sql.catalyst.plans.physical.RoundRobinPartitioning
+import org.apache.spark.sql.catalyst.expressions.{AggregateWindowFunction, 
Alias, Ascending, Attribute, AttributeReference, Expression, Literal, 
NamedExpression, SortOrder, WindowExpression, WindowSpecDefinition}
+import 
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, 
AggregateFunction, Final, Partial}
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning, RangePartitioning, RoundRobinPartitioning}
 import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.aggregate.HashAggregateExec
-import org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec
-import org.apache.spark.sql.execution.aggregate.SortAggregateExec
-import org.apache.spark.sql.execution.auron.plan.ConvertToNativeBase
-import org.apache.spark.sql.execution.auron.plan.NativeAggBase
-import org.apache.spark.sql.execution.auron.plan.NativeBroadcastExchangeBase
-import org.apache.spark.sql.execution.auron.plan.NativeOrcScanBase
-import org.apache.spark.sql.execution.auron.plan.NativeParquetScanBase
-import org.apache.spark.sql.execution.auron.plan.NativeSortBase
-import org.apache.spark.sql.execution.auron.plan.NativeUnionBase
-import org.apache.spark.sql.execution.auron.plan.Util
+import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, 
ObjectHashAggregateExec, SortAggregateExec}
+import org.apache.spark.sql.execution.auron.plan._
 import org.apache.spark.sql.execution.command.DataWritingCommandExec
-import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec
-import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
+import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
ShuffleExchangeExec}
 import org.apache.spark.sql.execution.joins._
 import org.apache.spark.sql.execution.window.WindowExec
 import org.apache.spark.sql.hive.execution.InsertIntoHiveTable
 import org.apache.spark.sql.hive.execution.auron.plan.NativeHiveTableScanBase
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.LongType
 
-import org.apache.auron.configuration.AuronConfiguration
-import org.apache.auron.jni.AuronAdaptor
-import org.apache.auron.metric.SparkMetricNode
-import org.apache.auron.protobuf.EmptyPartitionsExecNode
-import org.apache.auron.protobuf.PhysicalPlanNode
-import org.apache.auron.spark.configuration.SparkAuronConfiguration
-import org.apache.auron.sparkver
+import java.util.ServiceLoader
+import scala.annotation.tailrec

Review Comment:
   `scala.collection.mutable` import was removed, but this file still 
references `mutable.LinkedHashMap` later (e.g., around the partial-agg 
projection logic). This will not compile unless `scala.collection.mutable` (or 
the specific types used) are re-imported or fully qualified.
   ```suggestion
   import scala.annotation.tailrec
   import scala.collection.mutable
   ```



##########
spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHIveTableScanExec.scala:
##########
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive.execution.auron.plan
+
+import org.apache.auron.metric.SparkMetricNode
+import org.apache.auron.{protobuf => pb}
+import org.apache.hadoop.conf.Configurable
+import org.apache.hadoop.hive.ql.exec.Utilities
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
+import org.apache.hadoop.hive.ql.metadata.{Table => HiveTable}
+import org.apache.hadoop.hive.ql.plan.TableDesc
+import org.apache.hadoop.hive.serde.serdeConstants
+import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
+import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, 
StructObjectInspector}
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapred.{FileSplit, InputFormat, JobConf}
+import org.apache.hadoop.mapreduce.{InputFormat => newInputClass}
+import org.apache.hadoop.util.ReflectionUtils
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.HADOOP_RDD_IGNORE_EMPTY_SPLITS
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.auron.{NativeRDD, Shims}
+import org.apache.spark.sql.catalyst.expressions.{AttributeMap, 
GenericInternalRow}
+import org.apache.spark.sql.execution.datasources.{FilePartition, 
PartitionedFile}
+import org.apache.spark.sql.hive.client.HiveClientImpl
+import org.apache.spark.sql.hive.execution.HiveTableScanExec
+import org.apache.spark.sql.hive.{HadoopTableReader, HiveShim}
+import org.apache.spark.{Partition, TaskContext}
+
+import java.util.UUID
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+case class NativeHiveTableScanExec(basedHiveScan: HiveTableScanExec)
+  extends NativeHiveTableScanBase(basedHiveScan)
+    with Logging {
+
+  @transient private lazy val nativeTable: HiveTable = 
HiveClientImpl.toHiveTable(relation.tableMeta)
+  @transient private lazy val fileFormat = 
HiveTableUtil.getFileFormat(nativeTable.getInputFormatClass)
+  @transient private lazy val nativeTableDesc = new TableDesc(
+    nativeTable.getInputFormatClass,
+    nativeTable.getOutputFormatClass,
+    nativeTable.getMetadata)
+
+  @transient private lazy val nativeHadoopConf = {
+    val hiveConf = 
SparkSession.getActiveSession.get.sessionState.newHadoopConf()
+    // append columns ids and names before broadcast
+    val columnOrdinals = AttributeMap(relation.dataCols.zipWithIndex)
+    val neededColumnIDs = output.flatMap(columnOrdinals.get).map(o => o: 
Integer)
+    val neededColumnNames = output.filter(columnOrdinals.contains).map(_.name)
+
+    HiveShim.appendReadColumns(hiveConf, neededColumnIDs, neededColumnNames)
+
+    val deserializer = 
nativeTableDesc.getDeserializerClass.getConstructor().newInstance()
+    deserializer.initialize(hiveConf, nativeTableDesc.getProperties)
+
+    // Specifies types and object inspectors of columns to be scanned.
+    val structOI = ObjectInspectorUtils
+      .getStandardObjectInspector(
+        deserializer.getObjectInspector,
+        ObjectInspectorCopyOption.JAVA)
+      .asInstanceOf[StructObjectInspector]
+
+    val columnTypeNames = structOI
+      .getAllStructFieldRefs.asScala
+      .map(_.getFieldObjectInspector)
+      .map(TypeInfoUtils.getTypeInfoFromObjectInspector(_).getTypeName)
+      .mkString(",")
+
+    hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames)
+    hiveConf.set(serdeConstants.LIST_COLUMNS, 
relation.dataCols.map(_.name).mkString(","))
+    hiveConf
+  }
+
+  private val minPartitions = if 
(SparkSession.getActiveSession.get.sparkContext.isLocal) {
+    0 // will splitted based on block by default.
+  } else {
+    math.max(nativeHadoopConf.getInt("mapreduce.job.maps", 1),
+      SparkSession.getActiveSession.get.sparkContext.defaultMinPartitions)

Review Comment:
   `minPartitions` reads `SparkSession.getActiveSession.get.sparkContext` 
multiple times. Besides the `.get` risk, it’s also inconsistent with other 
native scan implementations in this repo that pass an explicit `sparkSession` 
around. Prefer using a single `sparkSession` resolved from `basedHiveScan` and 
derive `sparkContext` from it.



##########
spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHIveTableScanExec.scala:
##########
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive.execution.auron.plan
+
+import org.apache.auron.metric.SparkMetricNode
+import org.apache.auron.{protobuf => pb}

Review Comment:
   File name `NativeHIveTableScanExec.scala` has inconsistent casing ("HIve") 
compared to the class `NativeHiveTableScanExec`. On case-sensitive filesystems 
this is still valid but is easy to miss/grep incorrectly; consider renaming the 
file to `NativeHiveTableScanExec.scala` for consistency.



##########
spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHIveTableScanExec.scala:
##########
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive.execution.auron.plan
+
+import org.apache.auron.metric.SparkMetricNode
+import org.apache.auron.{protobuf => pb}
+import org.apache.hadoop.conf.Configurable
+import org.apache.hadoop.hive.ql.exec.Utilities
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
+import org.apache.hadoop.hive.ql.metadata.{Table => HiveTable}
+import org.apache.hadoop.hive.ql.plan.TableDesc
+import org.apache.hadoop.hive.serde.serdeConstants
+import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
+import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, 
StructObjectInspector}
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapred.{FileSplit, InputFormat, JobConf}
+import org.apache.hadoop.mapreduce.{InputFormat => newInputClass}
+import org.apache.hadoop.util.ReflectionUtils
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.HADOOP_RDD_IGNORE_EMPTY_SPLITS
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.auron.{NativeRDD, Shims}
+import org.apache.spark.sql.catalyst.expressions.{AttributeMap, 
GenericInternalRow}
+import org.apache.spark.sql.execution.datasources.{FilePartition, 
PartitionedFile}
+import org.apache.spark.sql.hive.client.HiveClientImpl
+import org.apache.spark.sql.hive.execution.HiveTableScanExec
+import org.apache.spark.sql.hive.{HadoopTableReader, HiveShim}
+import org.apache.spark.{Partition, TaskContext}
+
+import java.util.UUID
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+case class NativeHiveTableScanExec(basedHiveScan: HiveTableScanExec)
+  extends NativeHiveTableScanBase(basedHiveScan)
+    with Logging {
+
+  @transient private lazy val nativeTable: HiveTable = 
HiveClientImpl.toHiveTable(relation.tableMeta)
+  @transient private lazy val fileFormat = 
HiveTableUtil.getFileFormat(nativeTable.getInputFormatClass)
+  @transient private lazy val nativeTableDesc = new TableDesc(
+    nativeTable.getInputFormatClass,
+    nativeTable.getOutputFormatClass,
+    nativeTable.getMetadata)
+
+  @transient private lazy val nativeHadoopConf = {
+    val hiveConf = 
SparkSession.getActiveSession.get.sessionState.newHadoopConf()
+    // append columns ids and names before broadcast
+    val columnOrdinals = AttributeMap(relation.dataCols.zipWithIndex)
+    val neededColumnIDs = output.flatMap(columnOrdinals.get).map(o => o: 
Integer)
+    val neededColumnNames = output.filter(columnOrdinals.contains).map(_.name)
+
+    HiveShim.appendReadColumns(hiveConf, neededColumnIDs, neededColumnNames)
+
+    val deserializer = 
nativeTableDesc.getDeserializerClass.getConstructor().newInstance()
+    deserializer.initialize(hiveConf, nativeTableDesc.getProperties)
+
+    // Specifies types and object inspectors of columns to be scanned.
+    val structOI = ObjectInspectorUtils
+      .getStandardObjectInspector(
+        deserializer.getObjectInspector,
+        ObjectInspectorCopyOption.JAVA)
+      .asInstanceOf[StructObjectInspector]
+
+    val columnTypeNames = structOI
+      .getAllStructFieldRefs.asScala
+      .map(_.getFieldObjectInspector)
+      .map(TypeInfoUtils.getTypeInfoFromObjectInspector(_).getTypeName)
+      .mkString(",")
+
+    hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames)
+    hiveConf.set(serdeConstants.LIST_COLUMNS, 
relation.dataCols.map(_.name).mkString(","))
+    hiveConf
+  }
+
+  private val minPartitions = if 
(SparkSession.getActiveSession.get.sparkContext.isLocal) {
+    0 // will splitted based on block by default.
+  } else {
+    math.max(nativeHadoopConf.getInt("mapreduce.job.maps", 1),
+      SparkSession.getActiveSession.get.sparkContext.defaultMinPartitions)
+  }
+
+  private val ignoreEmptySplits =
+    
SparkSession.getActiveSession.get.sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS)
+
+  override val nodeName: String =
+    s"NativeHiveTableScan $tableName"
+
+  override def doExecuteNative(): NativeRDD = {
+    val nativeMetrics = SparkMetricNode(
+      metrics,
+      Nil,
+      Some({
+        case ("bytes_scanned", v) =>
+          val inputMetric = TaskContext.get.taskMetrics().inputMetrics
+          inputMetric.incBytesRead(v)
+        case ("output_rows", v) =>
+          val inputMetric = TaskContext.get.taskMetrics().inputMetrics
+          inputMetric.incRecordsRead(v)
+        case _ =>
+      }))
+    val nativeFileSchema = this.nativeFileSchema
+    val nativeFileGroups = this.nativeFileGroups
+    val nativePartitionSchema = this.nativePartitionSchema
+
+    val projection = schema.map(field => 
relation.schema.fieldIndex(field.name))
+    val broadcastedHadoopConf = this.broadcastedHadoopConf
+    val numPartitions = partitions.length
+
+    new NativeRDD(
+      sparkContext,
+      nativeMetrics,
+      partitions.asInstanceOf[Array[Partition]],
+      None,
+      Nil,
+      rddShuffleReadFull = true,
+      (partition, _) => {
+        val resourceId = s"NativeHiveTableScan:${UUID.randomUUID().toString}"
+        putJniBridgeResource(resourceId, broadcastedHadoopConf)
+
+        val nativeFileGroup = 
nativeFileGroups(partition.asInstanceOf[FilePartition])
+        val nativeFileScanConf = pb.FileScanExecConf
+          .newBuilder()
+          .setNumPartitions(numPartitions)
+          .setPartitionIndex(partition.index)
+          .setStatistics(pb.Statistics.getDefaultInstance)
+          .setSchema(nativeFileSchema)
+          .setFileGroup(nativeFileGroup)
+          .addAllProjection(projection.map(Integer.valueOf).asJava)
+          .setPartitionSchema(nativePartitionSchema)
+          .build()
+        fileFormat match {
+          case "orc" =>
+            val nativeOrcScanExecBuilder = pb.OrcScanExecNode
+              .newBuilder()
+              .setBaseConf(nativeFileScanConf)
+              .setFsResourceId(resourceId)
+              .addAllPruningPredicates(new java.util.ArrayList()) // not 
support this filter
+            pb.PhysicalPlanNode
+              .newBuilder()
+              .setOrcScan(nativeOrcScanExecBuilder.build())
+              .build()
+          case "parquet" =>
+            val nativeParquetScanExecBuilder = pb.ParquetScanExecNode
+              .newBuilder()
+              .setBaseConf(nativeFileScanConf)
+              .setFsResourceId(resourceId)
+              .addAllPruningPredicates(new java.util.ArrayList()) // not 
support this filter
+
+            pb.PhysicalPlanNode
+              .newBuilder()
+              .setParquetScan(nativeParquetScanExecBuilder.build())
+              .build()
+        }
+      },
+      friendlyName = "NativeRDD.HiveTableScan")
+  }
+
+  override def getFilePartitions(): Array[FilePartition] = {
+    val newJobConf = new JobConf(nativeHadoopConf)
+    val arrayFilePartition = ArrayBuffer[FilePartition]()
+    val partitionedFiles = if (relation.isPartitioned) {
+      val partitions = basedHiveScan.prunedPartitions
+      val arrayPartitionedFile = ArrayBuffer[PartitionedFile]()
+      partitions.foreach { partition =>
+        val partDesc = 
Utilities.getPartitionDescFromTableDesc(nativeTableDesc, partition, true)
+        val partPath = partition.getDataLocation
+        HadoopTableReader.initializeLocalJobConfFunc(partPath.toString, 
nativeTableDesc)(newJobConf)
+        val partitionValues = partition.getTPartition.getValues
+
+        val partitionInternalRow = new 
GenericInternalRow(partitionValues.size())
+        for (partitionIndex <- 0 until partitionValues.size) {
+          partitionInternalRow.update(partitionIndex, 
partitionValues.get(partitionIndex))
+        }
+
+        val inputFormatClass = partDesc.getInputFileFormatClass
+          .asInstanceOf[Class[newInputClass[Writable, Writable]]]
+        arrayPartitionedFile += getArrayPartitionedFile(newJobConf, 
inputFormatClass, partitionInternalRow)
+      }
+      arrayPartitionedFile
+        .sortBy(_.length)(implicitly[Ordering[Long]].reverse)
+        .toArray
+    } else {
+      val inputFormatClass = 
nativeTable.getInputFormatClass.asInstanceOf[Class[newInputClass[Writable, 
Writable]]]
+      getArrayPartitionedFile(newJobConf, inputFormatClass, new 
GenericInternalRow(0))
+        .sortBy(_.length)(implicitly[Ordering[Long]].reverse)
+        .toArray
+    }
+    arrayFilePartition += 
FilePartition.getFilePartitions(SparkSession.getActiveSession.get,
+      partitionedFiles,
+      getMaxSplitBytes(SparkSession.getActiveSession.get)).toArray
+    arrayFilePartition.toArray
+  }
+
+  private def getMaxSplitBytes(sparkSession: SparkSession): Long = {
+    val defaultMaxSplitBytes = 
sparkSession.sessionState.conf.filesMaxPartitionBytes
+    val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
+    Math.min(defaultMaxSplitBytes, openCostInBytes)
+  }
+
+  private def getArrayPartitionedFile(newJobConf: JobConf,
+                               inputFormatClass: Class[newInputClass[Writable, 
Writable]],
+                               partitionInternalRow: GenericInternalRow): 
ArrayBuffer[PartitionedFile] = {
+    val allInputSplits = getInputFormat(newJobConf, 
inputFormatClass).getSplits(newJobConf, minPartitions)
+    val inputSplits = if (ignoreEmptySplits) {
+      allInputSplits.filter(_.getLength > 0)
+    } else {
+      allInputSplits
+    }
+    inputFormatClass match {
+      case OrcInputFormat =>
+      case MapredParquetInputFormat =>
+      case _ =>
+    }
+    val arrayFilePartition = ArrayBuffer[PartitionedFile]()
+    for (i <- 0 until inputSplits.size) {
+      val inputSplit = inputSplits(i)
+      inputSplit match {
+        case FileSplit =>
+          val orcInputSplit = inputSplit.asInstanceOf[FileSplit]
+          arrayFilePartition +=
+            Shims.get.getPartitionedFile(partitionInternalRow, 
orcInputSplit.getPath.toString,
+              orcInputSplit.getStart, orcInputSplit.getLength)
+      }
+    }
+    arrayFilePartition
+  }
+
+  private def getInputFormat(conf: JobConf, inputFormatClass: 
Class[newInputClass[Writable, Writable]]):
+  InputFormat[Writable, Writable] = {
+    val newInputFormat = 
ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf)
+      .asInstanceOf[InputFormat[Writable, Writable]]
+    newInputFormat match {

Review Comment:
   `getInputFormat` takes a `mapreduce.InputFormat` class (`newInputClass`) but 
returns/instantiates `org.apache.hadoop.mapred.InputFormat`. This signature 
mismatch makes the unchecked cast even riskier. Align the parameter type with 
the returned InputFormat type (or vice versa) so the compiler can help enforce 
correctness.



##########
spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHIveTableScanExec.scala:
##########
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive.execution.auron.plan
+
+import org.apache.auron.metric.SparkMetricNode
+import org.apache.auron.{protobuf => pb}
+import org.apache.hadoop.conf.Configurable
+import org.apache.hadoop.hive.ql.exec.Utilities
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
+import org.apache.hadoop.hive.ql.metadata.{Table => HiveTable}
+import org.apache.hadoop.hive.ql.plan.TableDesc
+import org.apache.hadoop.hive.serde.serdeConstants
+import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
+import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, 
StructObjectInspector}
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapred.{FileSplit, InputFormat, JobConf}
+import org.apache.hadoop.mapreduce.{InputFormat => newInputClass}
+import org.apache.hadoop.util.ReflectionUtils
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.HADOOP_RDD_IGNORE_EMPTY_SPLITS
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.auron.{NativeRDD, Shims}
+import org.apache.spark.sql.catalyst.expressions.{AttributeMap, 
GenericInternalRow}
+import org.apache.spark.sql.execution.datasources.{FilePartition, 
PartitionedFile}
+import org.apache.spark.sql.hive.client.HiveClientImpl
+import org.apache.spark.sql.hive.execution.HiveTableScanExec
+import org.apache.spark.sql.hive.{HadoopTableReader, HiveShim}
+import org.apache.spark.{Partition, TaskContext}
+
+import java.util.UUID
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+case class NativeHiveTableScanExec(basedHiveScan: HiveTableScanExec)
+  extends NativeHiveTableScanBase(basedHiveScan)
+    with Logging {
+
+  @transient private lazy val nativeTable: HiveTable = 
HiveClientImpl.toHiveTable(relation.tableMeta)
+  @transient private lazy val fileFormat = 
HiveTableUtil.getFileFormat(nativeTable.getInputFormatClass)
+  @transient private lazy val nativeTableDesc = new TableDesc(
+    nativeTable.getInputFormatClass,
+    nativeTable.getOutputFormatClass,
+    nativeTable.getMetadata)
+
+  @transient private lazy val nativeHadoopConf = {
+    val hiveConf = 
SparkSession.getActiveSession.get.sessionState.newHadoopConf()
+    // append columns ids and names before broadcast
+    val columnOrdinals = AttributeMap(relation.dataCols.zipWithIndex)
+    val neededColumnIDs = output.flatMap(columnOrdinals.get).map(o => o: 
Integer)
+    val neededColumnNames = output.filter(columnOrdinals.contains).map(_.name)
+
+    HiveShim.appendReadColumns(hiveConf, neededColumnIDs, neededColumnNames)
+
+    val deserializer = 
nativeTableDesc.getDeserializerClass.getConstructor().newInstance()
+    deserializer.initialize(hiveConf, nativeTableDesc.getProperties)
+
+    // Specifies types and object inspectors of columns to be scanned.
+    val structOI = ObjectInspectorUtils
+      .getStandardObjectInspector(
+        deserializer.getObjectInspector,
+        ObjectInspectorCopyOption.JAVA)
+      .asInstanceOf[StructObjectInspector]
+
+    val columnTypeNames = structOI
+      .getAllStructFieldRefs.asScala
+      .map(_.getFieldObjectInspector)
+      .map(TypeInfoUtils.getTypeInfoFromObjectInspector(_).getTypeName)
+      .mkString(",")
+
+    hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames)
+    hiveConf.set(serdeConstants.LIST_COLUMNS, 
relation.dataCols.map(_.name).mkString(","))
+    hiveConf
+  }
+
+  private val minPartitions = if 
(SparkSession.getActiveSession.get.sparkContext.isLocal) {
+    0 // will splitted based on block by default.
+  } else {
+    math.max(nativeHadoopConf.getInt("mapreduce.job.maps", 1),
+      SparkSession.getActiveSession.get.sparkContext.defaultMinPartitions)
+  }
+
+  private val ignoreEmptySplits =
+    
SparkSession.getActiveSession.get.sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS)
+
+  override val nodeName: String =
+    s"NativeHiveTableScan $tableName"
+
+  override def doExecuteNative(): NativeRDD = {
+    val nativeMetrics = SparkMetricNode(
+      metrics,
+      Nil,
+      Some({
+        case ("bytes_scanned", v) =>
+          val inputMetric = TaskContext.get.taskMetrics().inputMetrics
+          inputMetric.incBytesRead(v)
+        case ("output_rows", v) =>
+          val inputMetric = TaskContext.get.taskMetrics().inputMetrics
+          inputMetric.incRecordsRead(v)
+        case _ =>
+      }))
+    val nativeFileSchema = this.nativeFileSchema
+    val nativeFileGroups = this.nativeFileGroups
+    val nativePartitionSchema = this.nativePartitionSchema
+
+    val projection = schema.map(field => 
relation.schema.fieldIndex(field.name))
+    val broadcastedHadoopConf = this.broadcastedHadoopConf
+    val numPartitions = partitions.length
+
+    new NativeRDD(
+      sparkContext,
+      nativeMetrics,
+      partitions.asInstanceOf[Array[Partition]],
+      None,
+      Nil,
+      rddShuffleReadFull = true,
+      (partition, _) => {
+        val resourceId = s"NativeHiveTableScan:${UUID.randomUUID().toString}"
+        putJniBridgeResource(resourceId, broadcastedHadoopConf)
+
+        val nativeFileGroup = 
nativeFileGroups(partition.asInstanceOf[FilePartition])
+        val nativeFileScanConf = pb.FileScanExecConf
+          .newBuilder()
+          .setNumPartitions(numPartitions)
+          .setPartitionIndex(partition.index)
+          .setStatistics(pb.Statistics.getDefaultInstance)
+          .setSchema(nativeFileSchema)
+          .setFileGroup(nativeFileGroup)
+          .addAllProjection(projection.map(Integer.valueOf).asJava)
+          .setPartitionSchema(nativePartitionSchema)
+          .build()
+        fileFormat match {
+          case "orc" =>
+            val nativeOrcScanExecBuilder = pb.OrcScanExecNode
+              .newBuilder()
+              .setBaseConf(nativeFileScanConf)
+              .setFsResourceId(resourceId)
+              .addAllPruningPredicates(new java.util.ArrayList()) // not 
support this filter
+            pb.PhysicalPlanNode
+              .newBuilder()
+              .setOrcScan(nativeOrcScanExecBuilder.build())
+              .build()
+          case "parquet" =>
+            val nativeParquetScanExecBuilder = pb.ParquetScanExecNode
+              .newBuilder()
+              .setBaseConf(nativeFileScanConf)
+              .setFsResourceId(resourceId)
+              .addAllPruningPredicates(new java.util.ArrayList()) // not 
support this filter
+
+            pb.PhysicalPlanNode
+              .newBuilder()
+              .setParquetScan(nativeParquetScanExecBuilder.build())
+              .build()
+        }
+      },
+      friendlyName = "NativeRDD.HiveTableScan")
+  }
+
+  override def getFilePartitions(): Array[FilePartition] = {
+    val newJobConf = new JobConf(nativeHadoopConf)
+    val arrayFilePartition = ArrayBuffer[FilePartition]()
+    val partitionedFiles = if (relation.isPartitioned) {
+      val partitions = basedHiveScan.prunedPartitions
+      val arrayPartitionedFile = ArrayBuffer[PartitionedFile]()
+      partitions.foreach { partition =>
+        val partDesc = 
Utilities.getPartitionDescFromTableDesc(nativeTableDesc, partition, true)
+        val partPath = partition.getDataLocation
+        HadoopTableReader.initializeLocalJobConfFunc(partPath.toString, 
nativeTableDesc)(newJobConf)
+        val partitionValues = partition.getTPartition.getValues
+
+        val partitionInternalRow = new 
GenericInternalRow(partitionValues.size())
+        for (partitionIndex <- 0 until partitionValues.size) {
+          partitionInternalRow.update(partitionIndex, 
partitionValues.get(partitionIndex))
+        }

Review Comment:
   Partition values are populated into `GenericInternalRow` using raw strings 
from Hive metastore (`partition.getTPartition.getValues`). Spark scan planning 
usually casts these strings to the partition schema types (and handles 
DEFAULT_PARTITION_NAME -> null / time zones). Without that casting, partition 
values/types may be incorrect. Consider building the partition `InternalRow` 
via the same cast/toRow approach used in `NativePaimonTableScanExec` (or 
existing Spark/Hive utilities).



##########
spark-extension-shims-spark/pom.xml:
##########
@@ -36,6 +36,15 @@
       <artifactId>auron-spark-ui_${scalaVersion}</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-hive_${scalaVersion}</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-catalyst_${scalaVersion}</artifactId>
+      <scope>provided</scope>
+    </dependency>

Review Comment:
   `spark-hive_${scalaVersion}` is declared twice: once with default scope 
(compile) and again with `<scope>provided</scope>`. This can unintentionally 
bundle Spark Hive classes into this module’s artifact and/or cause dependency 
resolution conflicts. Keep a single dependency entry with the intended scope.



##########
spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHIveTableScanExec.scala:
##########
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive.execution.auron.plan
+
+import org.apache.auron.metric.SparkMetricNode
+import org.apache.auron.{protobuf => pb}
+import org.apache.hadoop.conf.Configurable
+import org.apache.hadoop.hive.ql.exec.Utilities
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
+import org.apache.hadoop.hive.ql.metadata.{Table => HiveTable}
+import org.apache.hadoop.hive.ql.plan.TableDesc
+import org.apache.hadoop.hive.serde.serdeConstants
+import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
+import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, 
StructObjectInspector}
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapred.{FileSplit, InputFormat, JobConf}
+import org.apache.hadoop.mapreduce.{InputFormat => newInputClass}
+import org.apache.hadoop.util.ReflectionUtils
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.HADOOP_RDD_IGNORE_EMPTY_SPLITS
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.auron.{NativeRDD, Shims}
+import org.apache.spark.sql.catalyst.expressions.{AttributeMap, 
GenericInternalRow}
+import org.apache.spark.sql.execution.datasources.{FilePartition, 
PartitionedFile}
+import org.apache.spark.sql.hive.client.HiveClientImpl
+import org.apache.spark.sql.hive.execution.HiveTableScanExec
+import org.apache.spark.sql.hive.{HadoopTableReader, HiveShim}
+import org.apache.spark.{Partition, TaskContext}
+
+import java.util.UUID
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+case class NativeHiveTableScanExec(basedHiveScan: HiveTableScanExec)
+  extends NativeHiveTableScanBase(basedHiveScan)
+    with Logging {
+
+  @transient private lazy val nativeTable: HiveTable = 
HiveClientImpl.toHiveTable(relation.tableMeta)
+  @transient private lazy val fileFormat = 
HiveTableUtil.getFileFormat(nativeTable.getInputFormatClass)
+  @transient private lazy val nativeTableDesc = new TableDesc(
+    nativeTable.getInputFormatClass,
+    nativeTable.getOutputFormatClass,
+    nativeTable.getMetadata)
+
+  @transient private lazy val nativeHadoopConf = {
+    val hiveConf = 
SparkSession.getActiveSession.get.sessionState.newHadoopConf()
+    // append columns ids and names before broadcast
+    val columnOrdinals = AttributeMap(relation.dataCols.zipWithIndex)
+    val neededColumnIDs = output.flatMap(columnOrdinals.get).map(o => o: 
Integer)
+    val neededColumnNames = output.filter(columnOrdinals.contains).map(_.name)
+
+    HiveShim.appendReadColumns(hiveConf, neededColumnIDs, neededColumnNames)
+
+    val deserializer = 
nativeTableDesc.getDeserializerClass.getConstructor().newInstance()
+    deserializer.initialize(hiveConf, nativeTableDesc.getProperties)
+
+    // Specifies types and object inspectors of columns to be scanned.
+    val structOI = ObjectInspectorUtils
+      .getStandardObjectInspector(
+        deserializer.getObjectInspector,
+        ObjectInspectorCopyOption.JAVA)
+      .asInstanceOf[StructObjectInspector]
+
+    val columnTypeNames = structOI
+      .getAllStructFieldRefs.asScala
+      .map(_.getFieldObjectInspector)
+      .map(TypeInfoUtils.getTypeInfoFromObjectInspector(_).getTypeName)
+      .mkString(",")
+
+    hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames)
+    hiveConf.set(serdeConstants.LIST_COLUMNS, 
relation.dataCols.map(_.name).mkString(","))
+    hiveConf
+  }
+
+  private val minPartitions = if 
(SparkSession.getActiveSession.get.sparkContext.isLocal) {
+    0 // will splitted based on block by default.
+  } else {
+    math.max(nativeHadoopConf.getInt("mapreduce.job.maps", 1),
+      SparkSession.getActiveSession.get.sparkContext.defaultMinPartitions)
+  }
+
+  private val ignoreEmptySplits =
+    
SparkSession.getActiveSession.get.sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS)
+
+  override val nodeName: String =
+    s"NativeHiveTableScan $tableName"
+
+  override def doExecuteNative(): NativeRDD = {
+    val nativeMetrics = SparkMetricNode(
+      metrics,
+      Nil,
+      Some({
+        case ("bytes_scanned", v) =>
+          val inputMetric = TaskContext.get.taskMetrics().inputMetrics
+          inputMetric.incBytesRead(v)
+        case ("output_rows", v) =>
+          val inputMetric = TaskContext.get.taskMetrics().inputMetrics
+          inputMetric.incRecordsRead(v)
+        case _ =>
+      }))
+    val nativeFileSchema = this.nativeFileSchema
+    val nativeFileGroups = this.nativeFileGroups
+    val nativePartitionSchema = this.nativePartitionSchema
+
+    val projection = schema.map(field => 
relation.schema.fieldIndex(field.name))
+    val broadcastedHadoopConf = this.broadcastedHadoopConf
+    val numPartitions = partitions.length
+
+    new NativeRDD(
+      sparkContext,
+      nativeMetrics,
+      partitions.asInstanceOf[Array[Partition]],
+      None,
+      Nil,
+      rddShuffleReadFull = true,
+      (partition, _) => {
+        val resourceId = s"NativeHiveTableScan:${UUID.randomUUID().toString}"
+        putJniBridgeResource(resourceId, broadcastedHadoopConf)
+
+        val nativeFileGroup = 
nativeFileGroups(partition.asInstanceOf[FilePartition])
+        val nativeFileScanConf = pb.FileScanExecConf
+          .newBuilder()
+          .setNumPartitions(numPartitions)
+          .setPartitionIndex(partition.index)
+          .setStatistics(pb.Statistics.getDefaultInstance)
+          .setSchema(nativeFileSchema)
+          .setFileGroup(nativeFileGroup)
+          .addAllProjection(projection.map(Integer.valueOf).asJava)
+          .setPartitionSchema(nativePartitionSchema)
+          .build()
+        fileFormat match {
+          case "orc" =>
+            val nativeOrcScanExecBuilder = pb.OrcScanExecNode
+              .newBuilder()
+              .setBaseConf(nativeFileScanConf)
+              .setFsResourceId(resourceId)
+              .addAllPruningPredicates(new java.util.ArrayList()) // not 
support this filter
+            pb.PhysicalPlanNode
+              .newBuilder()
+              .setOrcScan(nativeOrcScanExecBuilder.build())
+              .build()
+          case "parquet" =>
+            val nativeParquetScanExecBuilder = pb.ParquetScanExecNode
+              .newBuilder()
+              .setBaseConf(nativeFileScanConf)
+              .setFsResourceId(resourceId)
+              .addAllPruningPredicates(new java.util.ArrayList()) // not 
support this filter
+
+            pb.PhysicalPlanNode
+              .newBuilder()
+              .setParquetScan(nativeParquetScanExecBuilder.build())
+              .build()
+        }
+      },
+      friendlyName = "NativeRDD.HiveTableScan")
+  }
+
+  override def getFilePartitions(): Array[FilePartition] = {
+    val newJobConf = new JobConf(nativeHadoopConf)
+    val arrayFilePartition = ArrayBuffer[FilePartition]()
+    val partitionedFiles = if (relation.isPartitioned) {
+      val partitions = basedHiveScan.prunedPartitions
+      val arrayPartitionedFile = ArrayBuffer[PartitionedFile]()
+      partitions.foreach { partition =>
+        val partDesc = 
Utilities.getPartitionDescFromTableDesc(nativeTableDesc, partition, true)
+        val partPath = partition.getDataLocation
+        HadoopTableReader.initializeLocalJobConfFunc(partPath.toString, 
nativeTableDesc)(newJobConf)
+        val partitionValues = partition.getTPartition.getValues
+
+        val partitionInternalRow = new 
GenericInternalRow(partitionValues.size())
+        for (partitionIndex <- 0 until partitionValues.size) {
+          partitionInternalRow.update(partitionIndex, 
partitionValues.get(partitionIndex))
+        }
+
+        val inputFormatClass = partDesc.getInputFileFormatClass
+          .asInstanceOf[Class[newInputClass[Writable, Writable]]]
+        arrayPartitionedFile += getArrayPartitionedFile(newJobConf, 
inputFormatClass, partitionInternalRow)
+      }

Review Comment:
   In `getFilePartitions`, `arrayPartitionedFile` is an 
`ArrayBuffer[PartitionedFile]`, but the code uses `+= 
getArrayPartitionedFile(...)` where `getArrayPartitionedFile` returns an 
`ArrayBuffer[PartitionedFile]`. This is a type mismatch and won’t compile; use 
`++=` (or change the helper to return a single `PartitionedFile`).



##########
spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHIveTableScanExec.scala:
##########
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive.execution.auron.plan
+
+import org.apache.auron.metric.SparkMetricNode
+import org.apache.auron.{protobuf => pb}
+import org.apache.hadoop.conf.Configurable
+import org.apache.hadoop.hive.ql.exec.Utilities
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
+import org.apache.hadoop.hive.ql.metadata.{Table => HiveTable}
+import org.apache.hadoop.hive.ql.plan.TableDesc
+import org.apache.hadoop.hive.serde.serdeConstants
+import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
+import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, 
StructObjectInspector}
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapred.{FileSplit, InputFormat, JobConf}
+import org.apache.hadoop.mapreduce.{InputFormat => newInputClass}
+import org.apache.hadoop.util.ReflectionUtils
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.HADOOP_RDD_IGNORE_EMPTY_SPLITS
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.auron.{NativeRDD, Shims}
+import org.apache.spark.sql.catalyst.expressions.{AttributeMap, 
GenericInternalRow}
+import org.apache.spark.sql.execution.datasources.{FilePartition, 
PartitionedFile}
+import org.apache.spark.sql.hive.client.HiveClientImpl
+import org.apache.spark.sql.hive.execution.HiveTableScanExec
+import org.apache.spark.sql.hive.{HadoopTableReader, HiveShim}
+import org.apache.spark.{Partition, TaskContext}
+
+import java.util.UUID
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+case class NativeHiveTableScanExec(basedHiveScan: HiveTableScanExec)
+  extends NativeHiveTableScanBase(basedHiveScan)
+    with Logging {
+
+  @transient private lazy val nativeTable: HiveTable = 
HiveClientImpl.toHiveTable(relation.tableMeta)
+  @transient private lazy val fileFormat = 
HiveTableUtil.getFileFormat(nativeTable.getInputFormatClass)
+  @transient private lazy val nativeTableDesc = new TableDesc(
+    nativeTable.getInputFormatClass,
+    nativeTable.getOutputFormatClass,
+    nativeTable.getMetadata)
+
+  @transient private lazy val nativeHadoopConf = {
+    val hiveConf = 
SparkSession.getActiveSession.get.sessionState.newHadoopConf()
+    // append columns ids and names before broadcast
+    val columnOrdinals = AttributeMap(relation.dataCols.zipWithIndex)
+    val neededColumnIDs = output.flatMap(columnOrdinals.get).map(o => o: 
Integer)
+    val neededColumnNames = output.filter(columnOrdinals.contains).map(_.name)
+
+    HiveShim.appendReadColumns(hiveConf, neededColumnIDs, neededColumnNames)
+
+    val deserializer = 
nativeTableDesc.getDeserializerClass.getConstructor().newInstance()
+    deserializer.initialize(hiveConf, nativeTableDesc.getProperties)
+
+    // Specifies types and object inspectors of columns to be scanned.
+    val structOI = ObjectInspectorUtils
+      .getStandardObjectInspector(
+        deserializer.getObjectInspector,
+        ObjectInspectorCopyOption.JAVA)
+      .asInstanceOf[StructObjectInspector]
+
+    val columnTypeNames = structOI
+      .getAllStructFieldRefs.asScala
+      .map(_.getFieldObjectInspector)
+      .map(TypeInfoUtils.getTypeInfoFromObjectInspector(_).getTypeName)
+      .mkString(",")
+
+    hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames)
+    hiveConf.set(serdeConstants.LIST_COLUMNS, 
relation.dataCols.map(_.name).mkString(","))
+    hiveConf
+  }
+
+  private val minPartitions = if 
(SparkSession.getActiveSession.get.sparkContext.isLocal) {
+    0 // will splitted based on block by default.
+  } else {
+    math.max(nativeHadoopConf.getInt("mapreduce.job.maps", 1),
+      SparkSession.getActiveSession.get.sparkContext.defaultMinPartitions)
+  }
+
+  private val ignoreEmptySplits =
+    
SparkSession.getActiveSession.get.sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS)
+
+  override val nodeName: String =
+    s"NativeHiveTableScan $tableName"
+
+  override def doExecuteNative(): NativeRDD = {
+    val nativeMetrics = SparkMetricNode(
+      metrics,
+      Nil,
+      Some({
+        case ("bytes_scanned", v) =>
+          val inputMetric = TaskContext.get.taskMetrics().inputMetrics
+          inputMetric.incBytesRead(v)
+        case ("output_rows", v) =>
+          val inputMetric = TaskContext.get.taskMetrics().inputMetrics
+          inputMetric.incRecordsRead(v)
+        case _ =>
+      }))
+    val nativeFileSchema = this.nativeFileSchema
+    val nativeFileGroups = this.nativeFileGroups
+    val nativePartitionSchema = this.nativePartitionSchema
+
+    val projection = schema.map(field => 
relation.schema.fieldIndex(field.name))
+    val broadcastedHadoopConf = this.broadcastedHadoopConf
+    val numPartitions = partitions.length
+
+    new NativeRDD(
+      sparkContext,
+      nativeMetrics,
+      partitions.asInstanceOf[Array[Partition]],
+      None,
+      Nil,
+      rddShuffleReadFull = true,
+      (partition, _) => {
+        val resourceId = s"NativeHiveTableScan:${UUID.randomUUID().toString}"
+        putJniBridgeResource(resourceId, broadcastedHadoopConf)
+
+        val nativeFileGroup = 
nativeFileGroups(partition.asInstanceOf[FilePartition])
+        val nativeFileScanConf = pb.FileScanExecConf
+          .newBuilder()
+          .setNumPartitions(numPartitions)
+          .setPartitionIndex(partition.index)
+          .setStatistics(pb.Statistics.getDefaultInstance)
+          .setSchema(nativeFileSchema)
+          .setFileGroup(nativeFileGroup)
+          .addAllProjection(projection.map(Integer.valueOf).asJava)
+          .setPartitionSchema(nativePartitionSchema)
+          .build()
+        fileFormat match {
+          case "orc" =>
+            val nativeOrcScanExecBuilder = pb.OrcScanExecNode
+              .newBuilder()
+              .setBaseConf(nativeFileScanConf)
+              .setFsResourceId(resourceId)
+              .addAllPruningPredicates(new java.util.ArrayList()) // not 
support this filter
+            pb.PhysicalPlanNode
+              .newBuilder()
+              .setOrcScan(nativeOrcScanExecBuilder.build())
+              .build()
+          case "parquet" =>
+            val nativeParquetScanExecBuilder = pb.ParquetScanExecNode
+              .newBuilder()
+              .setBaseConf(nativeFileScanConf)
+              .setFsResourceId(resourceId)
+              .addAllPruningPredicates(new java.util.ArrayList()) // not 
support this filter
+
+            pb.PhysicalPlanNode
+              .newBuilder()
+              .setParquetScan(nativeParquetScanExecBuilder.build())
+              .build()
+        }
+      },
+      friendlyName = "NativeRDD.HiveTableScan")
+  }
+
+  override def getFilePartitions(): Array[FilePartition] = {
+    val newJobConf = new JobConf(nativeHadoopConf)
+    val arrayFilePartition = ArrayBuffer[FilePartition]()
+    val partitionedFiles = if (relation.isPartitioned) {
+      val partitions = basedHiveScan.prunedPartitions
+      val arrayPartitionedFile = ArrayBuffer[PartitionedFile]()
+      partitions.foreach { partition =>
+        val partDesc = 
Utilities.getPartitionDescFromTableDesc(nativeTableDesc, partition, true)
+        val partPath = partition.getDataLocation
+        HadoopTableReader.initializeLocalJobConfFunc(partPath.toString, 
nativeTableDesc)(newJobConf)
+        val partitionValues = partition.getTPartition.getValues
+
+        val partitionInternalRow = new 
GenericInternalRow(partitionValues.size())
+        for (partitionIndex <- 0 until partitionValues.size) {
+          partitionInternalRow.update(partitionIndex, 
partitionValues.get(partitionIndex))
+        }
+
+        val inputFormatClass = partDesc.getInputFileFormatClass
+          .asInstanceOf[Class[newInputClass[Writable, Writable]]]
+        arrayPartitionedFile += getArrayPartitionedFile(newJobConf, 
inputFormatClass, partitionInternalRow)
+      }
+      arrayPartitionedFile
+        .sortBy(_.length)(implicitly[Ordering[Long]].reverse)
+        .toArray
+    } else {
+      val inputFormatClass = 
nativeTable.getInputFormatClass.asInstanceOf[Class[newInputClass[Writable, 
Writable]]]
+      getArrayPartitionedFile(newJobConf, inputFormatClass, new 
GenericInternalRow(0))
+        .sortBy(_.length)(implicitly[Ordering[Long]].reverse)
+        .toArray
+    }
+    arrayFilePartition += 
FilePartition.getFilePartitions(SparkSession.getActiveSession.get,
+      partitionedFiles,
+      getMaxSplitBytes(SparkSession.getActiveSession.get)).toArray
+    arrayFilePartition.toArray
+  }
+
+  private def getMaxSplitBytes(sparkSession: SparkSession): Long = {
+    val defaultMaxSplitBytes = 
sparkSession.sessionState.conf.filesMaxPartitionBytes
+    val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
+    Math.min(defaultMaxSplitBytes, openCostInBytes)
+  }
+
+  private def getArrayPartitionedFile(newJobConf: JobConf,
+                               inputFormatClass: Class[newInputClass[Writable, 
Writable]],
+                               partitionInternalRow: GenericInternalRow): 
ArrayBuffer[PartitionedFile] = {
+    val allInputSplits = getInputFormat(newJobConf, 
inputFormatClass).getSplits(newJobConf, minPartitions)

Review Comment:
   `getArrayPartitionedFile` declares `inputFormatClass` as a Hadoop 
`mapreduce.InputFormat` (`newInputClass`), but the implementation 
instantiates/casts it to the old `mapred.InputFormat` and uses 
`JobConf`/`getSplits(JobConf, Int)`. This API mismatch is unsafe (compile-time 
and runtime). Use a consistent InputFormat API (likely 
`org.apache.hadoop.mapred.InputFormat` given `JobConf`/`FileSplit` usage).



##########
spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/HiveConvertProvider.scala:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution.auron.plan
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.auron.AuronConverters.getBooleanConf
+import org.apache.spark.sql.auron.{AuronConvertProvider, AuronConverters}
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.hive.execution.HiveTableScanExec
+
+class HiveConvertProvider extends AuronConvertProvider with Logging {
+  override def isEnabled: Boolean =
+    getBooleanConf("spark.auron.enable.hiveTable", defaultValue = true)
+
+  def enableHiveTableScanExec: Boolean =
+    getBooleanConf("spark.auron.enable.hiveTableScanExec", defaultValue = 
false)
+

Review Comment:
   `HiveConvertProvider` is discovered via `ServiceLoader` (`AuronConverters` 
loads `AuronConvertProvider` implementations). This module currently doesn’t 
include a `META-INF/services/org.apache.spark.sql.auron.AuronConvertProvider` 
entry, so the provider won’t be loaded at runtime (see 
`thirdparty/auron-paimon` for the existing pattern).



##########
spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/HiveConvertProvider.scala:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution.auron.plan
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.auron.AuronConverters.getBooleanConf
+import org.apache.spark.sql.auron.{AuronConvertProvider, AuronConverters}
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.hive.execution.HiveTableScanExec
+
+class HiveConvertProvider extends AuronConvertProvider with Logging {
+  override def isEnabled: Boolean =
+    getBooleanConf("spark.auron.enable.hiveTable", defaultValue = true)
+
+  def enableHiveTableScanExec: Boolean =
+    getBooleanConf("spark.auron.enable.hiveTableScanExec", defaultValue = 
false)
+
+  override def isSupported(exec: SparkPlan): Boolean =
+    exec match {
+      case e: HiveTableScanExec if enableHiveTableScanExec &&
+        e.relation.tableMeta.provider.isDefined &&
+        e.relation.tableMeta.provider.get.equals("hive") =>
+        true
+    case _ => false
+  }
+
+  override def convert(exec: SparkPlan): SparkPlan = {
+    exec match {
+      case hiveExec: HiveTableScanExec if enableHiveTableScanExec =>
+        convertHiveTableScanExec(hiveExec)
+      case _ => exec
+    }
+  }
+
+  def convertHiveTableScanExec(hiveExec: HiveTableScanExec): SparkPlan = {
+    AuronConverters.addRenameColumnsExec(NativeHiveTableScanExec(hiveExec))
+  }

Review Comment:
   There are existing query/operator validation test utilities in this module, 
but this new Hive scan conversion path doesn’t appear to have coverage. Add at 
least one suite that enables `spark.auron.enable.hiveTableScanExec` and 
verifies `HiveTableScanExec` is converted (and that unsupported formats don’t 
break execution).



##########
spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHIveTableScanExec.scala:
##########
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive.execution.auron.plan
+
+import org.apache.auron.metric.SparkMetricNode
+import org.apache.auron.{protobuf => pb}
+import org.apache.hadoop.conf.Configurable
+import org.apache.hadoop.hive.ql.exec.Utilities
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
+import org.apache.hadoop.hive.ql.metadata.{Table => HiveTable}
+import org.apache.hadoop.hive.ql.plan.TableDesc
+import org.apache.hadoop.hive.serde.serdeConstants
+import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
+import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, 
StructObjectInspector}
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapred.{FileSplit, InputFormat, JobConf}
+import org.apache.hadoop.mapreduce.{InputFormat => newInputClass}
+import org.apache.hadoop.util.ReflectionUtils
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.HADOOP_RDD_IGNORE_EMPTY_SPLITS
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.auron.{NativeRDD, Shims}
+import org.apache.spark.sql.catalyst.expressions.{AttributeMap, 
GenericInternalRow}
+import org.apache.spark.sql.execution.datasources.{FilePartition, 
PartitionedFile}
+import org.apache.spark.sql.hive.client.HiveClientImpl
+import org.apache.spark.sql.hive.execution.HiveTableScanExec
+import org.apache.spark.sql.hive.{HadoopTableReader, HiveShim}
+import org.apache.spark.{Partition, TaskContext}
+
+import java.util.UUID
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+case class NativeHiveTableScanExec(basedHiveScan: HiveTableScanExec)
+  extends NativeHiveTableScanBase(basedHiveScan)
+    with Logging {
+
+  @transient private lazy val nativeTable: HiveTable = 
HiveClientImpl.toHiveTable(relation.tableMeta)
+  @transient private lazy val fileFormat = 
HiveTableUtil.getFileFormat(nativeTable.getInputFormatClass)
+  @transient private lazy val nativeTableDesc = new TableDesc(
+    nativeTable.getInputFormatClass,
+    nativeTable.getOutputFormatClass,
+    nativeTable.getMetadata)
+
+  @transient private lazy val nativeHadoopConf = {
+    val hiveConf = 
SparkSession.getActiveSession.get.sessionState.newHadoopConf()
+    // append columns ids and names before broadcast
+    val columnOrdinals = AttributeMap(relation.dataCols.zipWithIndex)
+    val neededColumnIDs = output.flatMap(columnOrdinals.get).map(o => o: 
Integer)
+    val neededColumnNames = output.filter(columnOrdinals.contains).map(_.name)
+
+    HiveShim.appendReadColumns(hiveConf, neededColumnIDs, neededColumnNames)
+
+    val deserializer = 
nativeTableDesc.getDeserializerClass.getConstructor().newInstance()
+    deserializer.initialize(hiveConf, nativeTableDesc.getProperties)
+
+    // Specifies types and object inspectors of columns to be scanned.
+    val structOI = ObjectInspectorUtils
+      .getStandardObjectInspector(
+        deserializer.getObjectInspector,
+        ObjectInspectorCopyOption.JAVA)
+      .asInstanceOf[StructObjectInspector]
+
+    val columnTypeNames = structOI
+      .getAllStructFieldRefs.asScala
+      .map(_.getFieldObjectInspector)
+      .map(TypeInfoUtils.getTypeInfoFromObjectInspector(_).getTypeName)
+      .mkString(",")
+
+    hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames)
+    hiveConf.set(serdeConstants.LIST_COLUMNS, 
relation.dataCols.map(_.name).mkString(","))
+    hiveConf
+  }
+
+  private val minPartitions = if 
(SparkSession.getActiveSession.get.sparkContext.isLocal) {
+    0 // will splitted based on block by default.
+  } else {
+    math.max(nativeHadoopConf.getInt("mapreduce.job.maps", 1),
+      SparkSession.getActiveSession.get.sparkContext.defaultMinPartitions)
+  }
+
+  private val ignoreEmptySplits =
+    
SparkSession.getActiveSession.get.sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS)
+
+  override val nodeName: String =
+    s"NativeHiveTableScan $tableName"
+
+  override def doExecuteNative(): NativeRDD = {
+    val nativeMetrics = SparkMetricNode(
+      metrics,
+      Nil,
+      Some({
+        case ("bytes_scanned", v) =>
+          val inputMetric = TaskContext.get.taskMetrics().inputMetrics
+          inputMetric.incBytesRead(v)
+        case ("output_rows", v) =>
+          val inputMetric = TaskContext.get.taskMetrics().inputMetrics
+          inputMetric.incRecordsRead(v)
+        case _ =>
+      }))
+    val nativeFileSchema = this.nativeFileSchema
+    val nativeFileGroups = this.nativeFileGroups
+    val nativePartitionSchema = this.nativePartitionSchema
+
+    val projection = schema.map(field => 
relation.schema.fieldIndex(field.name))
+    val broadcastedHadoopConf = this.broadcastedHadoopConf
+    val numPartitions = partitions.length
+
+    new NativeRDD(
+      sparkContext,
+      nativeMetrics,
+      partitions.asInstanceOf[Array[Partition]],
+      None,
+      Nil,
+      rddShuffleReadFull = true,
+      (partition, _) => {
+        val resourceId = s"NativeHiveTableScan:${UUID.randomUUID().toString}"
+        putJniBridgeResource(resourceId, broadcastedHadoopConf)
+
+        val nativeFileGroup = 
nativeFileGroups(partition.asInstanceOf[FilePartition])
+        val nativeFileScanConf = pb.FileScanExecConf
+          .newBuilder()
+          .setNumPartitions(numPartitions)
+          .setPartitionIndex(partition.index)
+          .setStatistics(pb.Statistics.getDefaultInstance)
+          .setSchema(nativeFileSchema)
+          .setFileGroup(nativeFileGroup)
+          .addAllProjection(projection.map(Integer.valueOf).asJava)
+          .setPartitionSchema(nativePartitionSchema)
+          .build()
+        fileFormat match {
+          case "orc" =>
+            val nativeOrcScanExecBuilder = pb.OrcScanExecNode
+              .newBuilder()
+              .setBaseConf(nativeFileScanConf)
+              .setFsResourceId(resourceId)
+              .addAllPruningPredicates(new java.util.ArrayList()) // not 
support this filter
+            pb.PhysicalPlanNode
+              .newBuilder()
+              .setOrcScan(nativeOrcScanExecBuilder.build())
+              .build()
+          case "parquet" =>
+            val nativeParquetScanExecBuilder = pb.ParquetScanExecNode
+              .newBuilder()
+              .setBaseConf(nativeFileScanConf)
+              .setFsResourceId(resourceId)
+              .addAllPruningPredicates(new java.util.ArrayList()) // not 
support this filter
+
+            pb.PhysicalPlanNode
+              .newBuilder()
+              .setParquetScan(nativeParquetScanExecBuilder.build())
+              .build()
+        }
+      },
+      friendlyName = "NativeRDD.HiveTableScan")
+  }
+
+  override def getFilePartitions(): Array[FilePartition] = {
+    val newJobConf = new JobConf(nativeHadoopConf)
+    val arrayFilePartition = ArrayBuffer[FilePartition]()
+    val partitionedFiles = if (relation.isPartitioned) {
+      val partitions = basedHiveScan.prunedPartitions
+      val arrayPartitionedFile = ArrayBuffer[PartitionedFile]()
+      partitions.foreach { partition =>
+        val partDesc = 
Utilities.getPartitionDescFromTableDesc(nativeTableDesc, partition, true)
+        val partPath = partition.getDataLocation
+        HadoopTableReader.initializeLocalJobConfFunc(partPath.toString, 
nativeTableDesc)(newJobConf)
+        val partitionValues = partition.getTPartition.getValues
+
+        val partitionInternalRow = new 
GenericInternalRow(partitionValues.size())
+        for (partitionIndex <- 0 until partitionValues.size) {
+          partitionInternalRow.update(partitionIndex, 
partitionValues.get(partitionIndex))
+        }
+
+        val inputFormatClass = partDesc.getInputFileFormatClass
+          .asInstanceOf[Class[newInputClass[Writable, Writable]]]
+        arrayPartitionedFile += getArrayPartitionedFile(newJobConf, 
inputFormatClass, partitionInternalRow)
+      }
+      arrayPartitionedFile
+        .sortBy(_.length)(implicitly[Ordering[Long]].reverse)
+        .toArray
+    } else {
+      val inputFormatClass = 
nativeTable.getInputFormatClass.asInstanceOf[Class[newInputClass[Writable, 
Writable]]]
+      getArrayPartitionedFile(newJobConf, inputFormatClass, new 
GenericInternalRow(0))
+        .sortBy(_.length)(implicitly[Ordering[Long]].reverse)
+        .toArray
+    }
+    arrayFilePartition += 
FilePartition.getFilePartitions(SparkSession.getActiveSession.get,
+      partitionedFiles,
+      getMaxSplitBytes(SparkSession.getActiveSession.get)).toArray
+    arrayFilePartition.toArray
+  }
+
+  private def getMaxSplitBytes(sparkSession: SparkSession): Long = {
+    val defaultMaxSplitBytes = 
sparkSession.sessionState.conf.filesMaxPartitionBytes
+    val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
+    Math.min(defaultMaxSplitBytes, openCostInBytes)
+  }
+
+  private def getArrayPartitionedFile(newJobConf: JobConf,
+                               inputFormatClass: Class[newInputClass[Writable, 
Writable]],
+                               partitionInternalRow: GenericInternalRow): 
ArrayBuffer[PartitionedFile] = {
+    val allInputSplits = getInputFormat(newJobConf, 
inputFormatClass).getSplits(newJobConf, minPartitions)
+    val inputSplits = if (ignoreEmptySplits) {
+      allInputSplits.filter(_.getLength > 0)
+    } else {
+      allInputSplits
+    }
+    inputFormatClass match {
+      case OrcInputFormat =>
+      case MapredParquetInputFormat =>
+      case _ =>
+    }

Review Comment:
   `inputFormatClass match { case OrcInputFormat => ... }` is matching a 
`Class[_]` value against a class name, and the cases are empty. If you need 
special handling by input format, compare against `classOf[OrcInputFormat]` / 
`classOf[MapredParquetInputFormat]` and implement the intended behavior; 
otherwise remove this dead code block.
   ```suggestion
   
   ```



##########
spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHIveTableScanExec.scala:
##########
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive.execution.auron.plan
+
+import org.apache.auron.metric.SparkMetricNode
+import org.apache.auron.{protobuf => pb}
+import org.apache.hadoop.conf.Configurable
+import org.apache.hadoop.hive.ql.exec.Utilities
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
+import org.apache.hadoop.hive.ql.metadata.{Table => HiveTable}
+import org.apache.hadoop.hive.ql.plan.TableDesc
+import org.apache.hadoop.hive.serde.serdeConstants
+import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
+import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, 
StructObjectInspector}
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapred.{FileSplit, InputFormat, JobConf}
+import org.apache.hadoop.mapreduce.{InputFormat => newInputClass}
+import org.apache.hadoop.util.ReflectionUtils
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.HADOOP_RDD_IGNORE_EMPTY_SPLITS
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.auron.{NativeRDD, Shims}
+import org.apache.spark.sql.catalyst.expressions.{AttributeMap, 
GenericInternalRow}
+import org.apache.spark.sql.execution.datasources.{FilePartition, 
PartitionedFile}
+import org.apache.spark.sql.hive.client.HiveClientImpl
+import org.apache.spark.sql.hive.execution.HiveTableScanExec
+import org.apache.spark.sql.hive.{HadoopTableReader, HiveShim}
+import org.apache.spark.{Partition, TaskContext}
+
+import java.util.UUID
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+case class NativeHiveTableScanExec(basedHiveScan: HiveTableScanExec)
+  extends NativeHiveTableScanBase(basedHiveScan)
+    with Logging {
+
+  @transient private lazy val nativeTable: HiveTable = 
HiveClientImpl.toHiveTable(relation.tableMeta)
+  @transient private lazy val fileFormat = 
HiveTableUtil.getFileFormat(nativeTable.getInputFormatClass)
+  @transient private lazy val nativeTableDesc = new TableDesc(
+    nativeTable.getInputFormatClass,
+    nativeTable.getOutputFormatClass,
+    nativeTable.getMetadata)
+
+  @transient private lazy val nativeHadoopConf = {
+    val hiveConf = 
SparkSession.getActiveSession.get.sessionState.newHadoopConf()
+    // append columns ids and names before broadcast
+    val columnOrdinals = AttributeMap(relation.dataCols.zipWithIndex)
+    val neededColumnIDs = output.flatMap(columnOrdinals.get).map(o => o: 
Integer)
+    val neededColumnNames = output.filter(columnOrdinals.contains).map(_.name)
+
+    HiveShim.appendReadColumns(hiveConf, neededColumnIDs, neededColumnNames)
+
+    val deserializer = 
nativeTableDesc.getDeserializerClass.getConstructor().newInstance()
+    deserializer.initialize(hiveConf, nativeTableDesc.getProperties)
+
+    // Specifies types and object inspectors of columns to be scanned.
+    val structOI = ObjectInspectorUtils
+      .getStandardObjectInspector(
+        deserializer.getObjectInspector,
+        ObjectInspectorCopyOption.JAVA)
+      .asInstanceOf[StructObjectInspector]
+
+    val columnTypeNames = structOI
+      .getAllStructFieldRefs.asScala
+      .map(_.getFieldObjectInspector)
+      .map(TypeInfoUtils.getTypeInfoFromObjectInspector(_).getTypeName)
+      .mkString(",")
+
+    hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames)
+    hiveConf.set(serdeConstants.LIST_COLUMNS, 
relation.dataCols.map(_.name).mkString(","))
+    hiveConf
+  }
+
+  private val minPartitions = if 
(SparkSession.getActiveSession.get.sparkContext.isLocal) {
+    0 // will splitted based on block by default.
+  } else {
+    math.max(nativeHadoopConf.getInt("mapreduce.job.maps", 1),
+      SparkSession.getActiveSession.get.sparkContext.defaultMinPartitions)
+  }
+
+  private val ignoreEmptySplits =
+    
SparkSession.getActiveSession.get.sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS)
+
+  override val nodeName: String =
+    s"NativeHiveTableScan $tableName"
+
+  override def doExecuteNative(): NativeRDD = {
+    val nativeMetrics = SparkMetricNode(
+      metrics,
+      Nil,
+      Some({
+        case ("bytes_scanned", v) =>
+          val inputMetric = TaskContext.get.taskMetrics().inputMetrics
+          inputMetric.incBytesRead(v)
+        case ("output_rows", v) =>
+          val inputMetric = TaskContext.get.taskMetrics().inputMetrics
+          inputMetric.incRecordsRead(v)
+        case _ =>
+      }))
+    val nativeFileSchema = this.nativeFileSchema
+    val nativeFileGroups = this.nativeFileGroups
+    val nativePartitionSchema = this.nativePartitionSchema
+
+    val projection = schema.map(field => 
relation.schema.fieldIndex(field.name))
+    val broadcastedHadoopConf = this.broadcastedHadoopConf
+    val numPartitions = partitions.length
+
+    new NativeRDD(
+      sparkContext,
+      nativeMetrics,
+      partitions.asInstanceOf[Array[Partition]],
+      None,
+      Nil,
+      rddShuffleReadFull = true,
+      (partition, _) => {
+        val resourceId = s"NativeHiveTableScan:${UUID.randomUUID().toString}"
+        putJniBridgeResource(resourceId, broadcastedHadoopConf)
+
+        val nativeFileGroup = 
nativeFileGroups(partition.asInstanceOf[FilePartition])
+        val nativeFileScanConf = pb.FileScanExecConf
+          .newBuilder()
+          .setNumPartitions(numPartitions)
+          .setPartitionIndex(partition.index)
+          .setStatistics(pb.Statistics.getDefaultInstance)
+          .setSchema(nativeFileSchema)
+          .setFileGroup(nativeFileGroup)
+          .addAllProjection(projection.map(Integer.valueOf).asJava)
+          .setPartitionSchema(nativePartitionSchema)
+          .build()
+        fileFormat match {
+          case "orc" =>
+            val nativeOrcScanExecBuilder = pb.OrcScanExecNode
+              .newBuilder()
+              .setBaseConf(nativeFileScanConf)
+              .setFsResourceId(resourceId)
+              .addAllPruningPredicates(new java.util.ArrayList()) // not 
support this filter
+            pb.PhysicalPlanNode
+              .newBuilder()
+              .setOrcScan(nativeOrcScanExecBuilder.build())
+              .build()
+          case "parquet" =>
+            val nativeParquetScanExecBuilder = pb.ParquetScanExecNode
+              .newBuilder()
+              .setBaseConf(nativeFileScanConf)
+              .setFsResourceId(resourceId)
+              .addAllPruningPredicates(new java.util.ArrayList()) // not 
support this filter
+
+            pb.PhysicalPlanNode
+              .newBuilder()
+              .setParquetScan(nativeParquetScanExecBuilder.build())
+              .build()
+        }
+      },
+      friendlyName = "NativeRDD.HiveTableScan")
+  }
+
+  override def getFilePartitions(): Array[FilePartition] = {
+    val newJobConf = new JobConf(nativeHadoopConf)
+    val arrayFilePartition = ArrayBuffer[FilePartition]()
+    val partitionedFiles = if (relation.isPartitioned) {
+      val partitions = basedHiveScan.prunedPartitions
+      val arrayPartitionedFile = ArrayBuffer[PartitionedFile]()
+      partitions.foreach { partition =>
+        val partDesc = 
Utilities.getPartitionDescFromTableDesc(nativeTableDesc, partition, true)
+        val partPath = partition.getDataLocation
+        HadoopTableReader.initializeLocalJobConfFunc(partPath.toString, 
nativeTableDesc)(newJobConf)
+        val partitionValues = partition.getTPartition.getValues
+
+        val partitionInternalRow = new 
GenericInternalRow(partitionValues.size())
+        for (partitionIndex <- 0 until partitionValues.size) {
+          partitionInternalRow.update(partitionIndex, 
partitionValues.get(partitionIndex))
+        }
+
+        val inputFormatClass = partDesc.getInputFileFormatClass
+          .asInstanceOf[Class[newInputClass[Writable, Writable]]]
+        arrayPartitionedFile += getArrayPartitionedFile(newJobConf, 
inputFormatClass, partitionInternalRow)
+      }
+      arrayPartitionedFile
+        .sortBy(_.length)(implicitly[Ordering[Long]].reverse)
+        .toArray
+    } else {
+      val inputFormatClass = 
nativeTable.getInputFormatClass.asInstanceOf[Class[newInputClass[Writable, 
Writable]]]
+      getArrayPartitionedFile(newJobConf, inputFormatClass, new 
GenericInternalRow(0))
+        .sortBy(_.length)(implicitly[Ordering[Long]].reverse)
+        .toArray
+    }
+    arrayFilePartition += 
FilePartition.getFilePartitions(SparkSession.getActiveSession.get,
+      partitionedFiles,
+      getMaxSplitBytes(SparkSession.getActiveSession.get)).toArray

Review Comment:
   `FilePartition.getFilePartitions(SparkSession.getActiveSession.get, ...)` 
again relies on `getActiveSession.get`. Use the `sparkSession` derived from 
`basedHiveScan` (as in `NativeHiveTableScanBase`) so partition planning doesn’t 
fail when there’s no active session.
   ```suggestion
       val sparkSession = basedHiveScan.sparkSession
       arrayFilePartition += FilePartition.getFilePartitions(
         sparkSession,
         partitionedFiles,
         getMaxSplitBytes(sparkSession)
       ).toArray
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to