Copilot commented on code in PR #1995: URL: https://github.com/apache/auron/pull/1995#discussion_r2792581956
########## spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHIveTableScanExec.scala: ########## @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive.execution.auron.plan + +import org.apache.auron.metric.SparkMetricNode +import org.apache.auron.{protobuf => pb} +import org.apache.hadoop.conf.Configurable +import org.apache.hadoop.hive.ql.exec.Utilities +import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat +import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat +import org.apache.hadoop.hive.ql.metadata.{Table => HiveTable} +import org.apache.hadoop.hive.ql.plan.TableDesc +import org.apache.hadoop.hive.serde.serdeConstants +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption +import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, StructObjectInspector} +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils +import org.apache.hadoop.io.Writable +import org.apache.hadoop.mapred.{FileSplit, InputFormat, JobConf} +import org.apache.hadoop.mapreduce.{InputFormat => newInputClass} +import org.apache.hadoop.util.ReflectionUtils +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.HADOOP_RDD_IGNORE_EMPTY_SPLITS +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.auron.{NativeRDD, Shims} +import org.apache.spark.sql.catalyst.expressions.{AttributeMap, GenericInternalRow} +import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile} +import org.apache.spark.sql.hive.client.HiveClientImpl +import org.apache.spark.sql.hive.execution.HiveTableScanExec +import org.apache.spark.sql.hive.{HadoopTableReader, HiveShim} +import org.apache.spark.{Partition, TaskContext} + +import java.util.UUID +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +case class NativeHiveTableScanExec(basedHiveScan: HiveTableScanExec) + extends NativeHiveTableScanBase(basedHiveScan) + with Logging { + + @transient private lazy val nativeTable: HiveTable = HiveClientImpl.toHiveTable(relation.tableMeta) + @transient private lazy val fileFormat = HiveTableUtil.getFileFormat(nativeTable.getInputFormatClass) + @transient private lazy val nativeTableDesc = new TableDesc( + nativeTable.getInputFormatClass, + nativeTable.getOutputFormatClass, + nativeTable.getMetadata) + + @transient private lazy val nativeHadoopConf = { + val hiveConf = SparkSession.getActiveSession.get.sessionState.newHadoopConf() Review Comment: `nativeHadoopConf` uses `SparkSession.getActiveSession.get`, which can throw if there is no active session (e.g., execution triggered outside a SQL context). Consider using the same session derivation as `NativeHiveTableScanBase.broadcastedHadoopConf` (`Shims.get.getSqlContext(basedHiveScan).sparkSession`) to avoid runtime failures. ```suggestion val sparkSession = Shims.get.getSqlContext(basedHiveScan).sparkSession val hiveConf = sparkSession.sessionState.newHadoopConf() ``` ########## spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/HiveConvertProvider.scala: ########## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution.auron.plan + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.auron.AuronConverters.getBooleanConf +import org.apache.spark.sql.auron.{AuronConvertProvider, AuronConverters} +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.hive.execution.HiveTableScanExec + +class HiveConvertProvider extends AuronConvertProvider with Logging { + override def isEnabled: Boolean = + getBooleanConf("spark.auron.enable.hiveTable", defaultValue = true) + + def enableHiveTableScanExec: Boolean = + getBooleanConf("spark.auron.enable.hiveTableScanExec", defaultValue = false) + + override def isSupported(exec: SparkPlan): Boolean = + exec match { + case e: HiveTableScanExec if enableHiveTableScanExec && + e.relation.tableMeta.provider.isDefined && + e.relation.tableMeta.provider.get.equals("hive") => + true Review Comment: `isSupported` accepts all Hive tables with provider == "hive", but `NativeHiveTableScanExec` only builds native nodes for ORC/Parquet and otherwise will throw (e.g., `MatchError` on file format). Add an explicit format check here (or make the native exec gracefully fall back) to avoid runtime failures on non-ORC/Parquet Hive tables. ########## spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHIveTableScanExec.scala: ########## @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive.execution.auron.plan + +import org.apache.auron.metric.SparkMetricNode +import org.apache.auron.{protobuf => pb} +import org.apache.hadoop.conf.Configurable +import org.apache.hadoop.hive.ql.exec.Utilities +import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat +import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat +import org.apache.hadoop.hive.ql.metadata.{Table => HiveTable} +import org.apache.hadoop.hive.ql.plan.TableDesc +import org.apache.hadoop.hive.serde.serdeConstants +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption +import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, StructObjectInspector} +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils +import org.apache.hadoop.io.Writable +import org.apache.hadoop.mapred.{FileSplit, InputFormat, JobConf} +import org.apache.hadoop.mapreduce.{InputFormat => newInputClass} +import org.apache.hadoop.util.ReflectionUtils +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.HADOOP_RDD_IGNORE_EMPTY_SPLITS +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.auron.{NativeRDD, Shims} +import org.apache.spark.sql.catalyst.expressions.{AttributeMap, GenericInternalRow} +import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile} +import org.apache.spark.sql.hive.client.HiveClientImpl +import org.apache.spark.sql.hive.execution.HiveTableScanExec +import org.apache.spark.sql.hive.{HadoopTableReader, HiveShim} +import org.apache.spark.{Partition, TaskContext} + +import java.util.UUID +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +case class NativeHiveTableScanExec(basedHiveScan: HiveTableScanExec) + extends NativeHiveTableScanBase(basedHiveScan) + with Logging { + + @transient private lazy val nativeTable: HiveTable = HiveClientImpl.toHiveTable(relation.tableMeta) + @transient private lazy val fileFormat = HiveTableUtil.getFileFormat(nativeTable.getInputFormatClass) + @transient private lazy val nativeTableDesc = new TableDesc( + nativeTable.getInputFormatClass, + nativeTable.getOutputFormatClass, + nativeTable.getMetadata) + + @transient private lazy val nativeHadoopConf = { + val hiveConf = SparkSession.getActiveSession.get.sessionState.newHadoopConf() + // append columns ids and names before broadcast + val columnOrdinals = AttributeMap(relation.dataCols.zipWithIndex) + val neededColumnIDs = output.flatMap(columnOrdinals.get).map(o => o: Integer) + val neededColumnNames = output.filter(columnOrdinals.contains).map(_.name) + + HiveShim.appendReadColumns(hiveConf, neededColumnIDs, neededColumnNames) + + val deserializer = nativeTableDesc.getDeserializerClass.getConstructor().newInstance() + deserializer.initialize(hiveConf, nativeTableDesc.getProperties) + + // Specifies types and object inspectors of columns to be scanned. + val structOI = ObjectInspectorUtils + .getStandardObjectInspector( + deserializer.getObjectInspector, + ObjectInspectorCopyOption.JAVA) + .asInstanceOf[StructObjectInspector] + + val columnTypeNames = structOI + .getAllStructFieldRefs.asScala + .map(_.getFieldObjectInspector) + .map(TypeInfoUtils.getTypeInfoFromObjectInspector(_).getTypeName) + .mkString(",") + + hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames) + hiveConf.set(serdeConstants.LIST_COLUMNS, relation.dataCols.map(_.name).mkString(",")) + hiveConf + } + + private val minPartitions = if (SparkSession.getActiveSession.get.sparkContext.isLocal) { + 0 // will splitted based on block by default. + } else { + math.max(nativeHadoopConf.getInt("mapreduce.job.maps", 1), + SparkSession.getActiveSession.get.sparkContext.defaultMinPartitions) + } + + private val ignoreEmptySplits = + SparkSession.getActiveSession.get.sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS) Review Comment: `ignoreEmptySplits` also depends on `SparkSession.getActiveSession.get`. This should use the same non-optional session/context resolution as the rest of the execution code to avoid `NoSuchElementException` when there is no active session. ```suggestion private val minPartitions = if (sparkContext.isLocal) { 0 // will splitted based on block by default. } else { math.max(nativeHadoopConf.getInt("mapreduce.job.maps", 1), sparkContext.defaultMinPartitions) } private val ignoreEmptySplits = sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS) ``` ########## spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHIveTableScanExec.scala: ########## @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive.execution.auron.plan + +import org.apache.auron.metric.SparkMetricNode +import org.apache.auron.{protobuf => pb} +import org.apache.hadoop.conf.Configurable +import org.apache.hadoop.hive.ql.exec.Utilities +import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat +import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat +import org.apache.hadoop.hive.ql.metadata.{Table => HiveTable} +import org.apache.hadoop.hive.ql.plan.TableDesc +import org.apache.hadoop.hive.serde.serdeConstants +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption +import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, StructObjectInspector} +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils +import org.apache.hadoop.io.Writable +import org.apache.hadoop.mapred.{FileSplit, InputFormat, JobConf} +import org.apache.hadoop.mapreduce.{InputFormat => newInputClass} +import org.apache.hadoop.util.ReflectionUtils +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.HADOOP_RDD_IGNORE_EMPTY_SPLITS +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.auron.{NativeRDD, Shims} +import org.apache.spark.sql.catalyst.expressions.{AttributeMap, GenericInternalRow} +import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile} +import org.apache.spark.sql.hive.client.HiveClientImpl +import org.apache.spark.sql.hive.execution.HiveTableScanExec +import org.apache.spark.sql.hive.{HadoopTableReader, HiveShim} +import org.apache.spark.{Partition, TaskContext} + +import java.util.UUID +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +case class NativeHiveTableScanExec(basedHiveScan: HiveTableScanExec) + extends NativeHiveTableScanBase(basedHiveScan) + with Logging { + + @transient private lazy val nativeTable: HiveTable = HiveClientImpl.toHiveTable(relation.tableMeta) + @transient private lazy val fileFormat = HiveTableUtil.getFileFormat(nativeTable.getInputFormatClass) + @transient private lazy val nativeTableDesc = new TableDesc( + nativeTable.getInputFormatClass, + nativeTable.getOutputFormatClass, + nativeTable.getMetadata) + + @transient private lazy val nativeHadoopConf = { + val hiveConf = SparkSession.getActiveSession.get.sessionState.newHadoopConf() + // append columns ids and names before broadcast + val columnOrdinals = AttributeMap(relation.dataCols.zipWithIndex) + val neededColumnIDs = output.flatMap(columnOrdinals.get).map(o => o: Integer) + val neededColumnNames = output.filter(columnOrdinals.contains).map(_.name) + + HiveShim.appendReadColumns(hiveConf, neededColumnIDs, neededColumnNames) + + val deserializer = nativeTableDesc.getDeserializerClass.getConstructor().newInstance() + deserializer.initialize(hiveConf, nativeTableDesc.getProperties) + + // Specifies types and object inspectors of columns to be scanned. + val structOI = ObjectInspectorUtils + .getStandardObjectInspector( + deserializer.getObjectInspector, + ObjectInspectorCopyOption.JAVA) + .asInstanceOf[StructObjectInspector] + + val columnTypeNames = structOI + .getAllStructFieldRefs.asScala + .map(_.getFieldObjectInspector) + .map(TypeInfoUtils.getTypeInfoFromObjectInspector(_).getTypeName) + .mkString(",") + + hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames) + hiveConf.set(serdeConstants.LIST_COLUMNS, relation.dataCols.map(_.name).mkString(",")) + hiveConf + } + + private val minPartitions = if (SparkSession.getActiveSession.get.sparkContext.isLocal) { + 0 // will splitted based on block by default. + } else { + math.max(nativeHadoopConf.getInt("mapreduce.job.maps", 1), + SparkSession.getActiveSession.get.sparkContext.defaultMinPartitions) + } + + private val ignoreEmptySplits = + SparkSession.getActiveSession.get.sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS) + + override val nodeName: String = + s"NativeHiveTableScan $tableName" + + override def doExecuteNative(): NativeRDD = { + val nativeMetrics = SparkMetricNode( + metrics, + Nil, + Some({ + case ("bytes_scanned", v) => + val inputMetric = TaskContext.get.taskMetrics().inputMetrics + inputMetric.incBytesRead(v) + case ("output_rows", v) => + val inputMetric = TaskContext.get.taskMetrics().inputMetrics + inputMetric.incRecordsRead(v) + case _ => + })) + val nativeFileSchema = this.nativeFileSchema + val nativeFileGroups = this.nativeFileGroups + val nativePartitionSchema = this.nativePartitionSchema + + val projection = schema.map(field => relation.schema.fieldIndex(field.name)) + val broadcastedHadoopConf = this.broadcastedHadoopConf + val numPartitions = partitions.length + + new NativeRDD( + sparkContext, + nativeMetrics, + partitions.asInstanceOf[Array[Partition]], + None, + Nil, + rddShuffleReadFull = true, + (partition, _) => { + val resourceId = s"NativeHiveTableScan:${UUID.randomUUID().toString}" + putJniBridgeResource(resourceId, broadcastedHadoopConf) + + val nativeFileGroup = nativeFileGroups(partition.asInstanceOf[FilePartition]) + val nativeFileScanConf = pb.FileScanExecConf + .newBuilder() + .setNumPartitions(numPartitions) + .setPartitionIndex(partition.index) + .setStatistics(pb.Statistics.getDefaultInstance) + .setSchema(nativeFileSchema) + .setFileGroup(nativeFileGroup) + .addAllProjection(projection.map(Integer.valueOf).asJava) + .setPartitionSchema(nativePartitionSchema) + .build() + fileFormat match { + case "orc" => + val nativeOrcScanExecBuilder = pb.OrcScanExecNode + .newBuilder() + .setBaseConf(nativeFileScanConf) + .setFsResourceId(resourceId) + .addAllPruningPredicates(new java.util.ArrayList()) // not support this filter Review Comment: `fileFormat match` only has cases for "orc" and "parquet". Since `HiveTableUtil.getFileFormat` can return "other", this can throw a `MatchError` at runtime. Add a default case that either throws a clear unsupported-format error or triggers a non-native fallback. ########## spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHIveTableScanExec.scala: ########## @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive.execution.auron.plan + +import org.apache.auron.metric.SparkMetricNode +import org.apache.auron.{protobuf => pb} +import org.apache.hadoop.conf.Configurable +import org.apache.hadoop.hive.ql.exec.Utilities +import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat +import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat +import org.apache.hadoop.hive.ql.metadata.{Table => HiveTable} +import org.apache.hadoop.hive.ql.plan.TableDesc +import org.apache.hadoop.hive.serde.serdeConstants +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption +import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, StructObjectInspector} +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils +import org.apache.hadoop.io.Writable +import org.apache.hadoop.mapred.{FileSplit, InputFormat, JobConf} +import org.apache.hadoop.mapreduce.{InputFormat => newInputClass} +import org.apache.hadoop.util.ReflectionUtils +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.HADOOP_RDD_IGNORE_EMPTY_SPLITS +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.auron.{NativeRDD, Shims} +import org.apache.spark.sql.catalyst.expressions.{AttributeMap, GenericInternalRow} +import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile} +import org.apache.spark.sql.hive.client.HiveClientImpl +import org.apache.spark.sql.hive.execution.HiveTableScanExec +import org.apache.spark.sql.hive.{HadoopTableReader, HiveShim} +import org.apache.spark.{Partition, TaskContext} + +import java.util.UUID +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +case class NativeHiveTableScanExec(basedHiveScan: HiveTableScanExec) + extends NativeHiveTableScanBase(basedHiveScan) + with Logging { + + @transient private lazy val nativeTable: HiveTable = HiveClientImpl.toHiveTable(relation.tableMeta) + @transient private lazy val fileFormat = HiveTableUtil.getFileFormat(nativeTable.getInputFormatClass) + @transient private lazy val nativeTableDesc = new TableDesc( + nativeTable.getInputFormatClass, + nativeTable.getOutputFormatClass, + nativeTable.getMetadata) + + @transient private lazy val nativeHadoopConf = { + val hiveConf = SparkSession.getActiveSession.get.sessionState.newHadoopConf() + // append columns ids and names before broadcast + val columnOrdinals = AttributeMap(relation.dataCols.zipWithIndex) + val neededColumnIDs = output.flatMap(columnOrdinals.get).map(o => o: Integer) + val neededColumnNames = output.filter(columnOrdinals.contains).map(_.name) + + HiveShim.appendReadColumns(hiveConf, neededColumnIDs, neededColumnNames) + + val deserializer = nativeTableDesc.getDeserializerClass.getConstructor().newInstance() + deserializer.initialize(hiveConf, nativeTableDesc.getProperties) + + // Specifies types and object inspectors of columns to be scanned. + val structOI = ObjectInspectorUtils + .getStandardObjectInspector( + deserializer.getObjectInspector, + ObjectInspectorCopyOption.JAVA) + .asInstanceOf[StructObjectInspector] + + val columnTypeNames = structOI + .getAllStructFieldRefs.asScala + .map(_.getFieldObjectInspector) + .map(TypeInfoUtils.getTypeInfoFromObjectInspector(_).getTypeName) + .mkString(",") + + hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames) + hiveConf.set(serdeConstants.LIST_COLUMNS, relation.dataCols.map(_.name).mkString(",")) + hiveConf + } + + private val minPartitions = if (SparkSession.getActiveSession.get.sparkContext.isLocal) { + 0 // will splitted based on block by default. + } else { + math.max(nativeHadoopConf.getInt("mapreduce.job.maps", 1), + SparkSession.getActiveSession.get.sparkContext.defaultMinPartitions) + } + + private val ignoreEmptySplits = + SparkSession.getActiveSession.get.sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS) + + override val nodeName: String = + s"NativeHiveTableScan $tableName" + + override def doExecuteNative(): NativeRDD = { + val nativeMetrics = SparkMetricNode( + metrics, + Nil, + Some({ + case ("bytes_scanned", v) => + val inputMetric = TaskContext.get.taskMetrics().inputMetrics + inputMetric.incBytesRead(v) + case ("output_rows", v) => + val inputMetric = TaskContext.get.taskMetrics().inputMetrics + inputMetric.incRecordsRead(v) + case _ => + })) + val nativeFileSchema = this.nativeFileSchema + val nativeFileGroups = this.nativeFileGroups + val nativePartitionSchema = this.nativePartitionSchema + + val projection = schema.map(field => relation.schema.fieldIndex(field.name)) + val broadcastedHadoopConf = this.broadcastedHadoopConf + val numPartitions = partitions.length + + new NativeRDD( + sparkContext, + nativeMetrics, + partitions.asInstanceOf[Array[Partition]], + None, + Nil, + rddShuffleReadFull = true, + (partition, _) => { + val resourceId = s"NativeHiveTableScan:${UUID.randomUUID().toString}" + putJniBridgeResource(resourceId, broadcastedHadoopConf) + + val nativeFileGroup = nativeFileGroups(partition.asInstanceOf[FilePartition]) + val nativeFileScanConf = pb.FileScanExecConf + .newBuilder() + .setNumPartitions(numPartitions) + .setPartitionIndex(partition.index) + .setStatistics(pb.Statistics.getDefaultInstance) + .setSchema(nativeFileSchema) + .setFileGroup(nativeFileGroup) + .addAllProjection(projection.map(Integer.valueOf).asJava) + .setPartitionSchema(nativePartitionSchema) + .build() + fileFormat match { + case "orc" => + val nativeOrcScanExecBuilder = pb.OrcScanExecNode + .newBuilder() + .setBaseConf(nativeFileScanConf) + .setFsResourceId(resourceId) + .addAllPruningPredicates(new java.util.ArrayList()) // not support this filter + pb.PhysicalPlanNode + .newBuilder() + .setOrcScan(nativeOrcScanExecBuilder.build()) + .build() + case "parquet" => + val nativeParquetScanExecBuilder = pb.ParquetScanExecNode + .newBuilder() + .setBaseConf(nativeFileScanConf) + .setFsResourceId(resourceId) + .addAllPruningPredicates(new java.util.ArrayList()) // not support this filter + + pb.PhysicalPlanNode + .newBuilder() + .setParquetScan(nativeParquetScanExecBuilder.build()) + .build() + } + }, + friendlyName = "NativeRDD.HiveTableScan") + } + + override def getFilePartitions(): Array[FilePartition] = { + val newJobConf = new JobConf(nativeHadoopConf) + val arrayFilePartition = ArrayBuffer[FilePartition]() + val partitionedFiles = if (relation.isPartitioned) { + val partitions = basedHiveScan.prunedPartitions + val arrayPartitionedFile = ArrayBuffer[PartitionedFile]() + partitions.foreach { partition => + val partDesc = Utilities.getPartitionDescFromTableDesc(nativeTableDesc, partition, true) + val partPath = partition.getDataLocation + HadoopTableReader.initializeLocalJobConfFunc(partPath.toString, nativeTableDesc)(newJobConf) + val partitionValues = partition.getTPartition.getValues + + val partitionInternalRow = new GenericInternalRow(partitionValues.size()) + for (partitionIndex <- 0 until partitionValues.size) { + partitionInternalRow.update(partitionIndex, partitionValues.get(partitionIndex)) + } + + val inputFormatClass = partDesc.getInputFileFormatClass + .asInstanceOf[Class[newInputClass[Writable, Writable]]] + arrayPartitionedFile += getArrayPartitionedFile(newJobConf, inputFormatClass, partitionInternalRow) + } + arrayPartitionedFile + .sortBy(_.length)(implicitly[Ordering[Long]].reverse) + .toArray + } else { + val inputFormatClass = nativeTable.getInputFormatClass.asInstanceOf[Class[newInputClass[Writable, Writable]]] + getArrayPartitionedFile(newJobConf, inputFormatClass, new GenericInternalRow(0)) + .sortBy(_.length)(implicitly[Ordering[Long]].reverse) + .toArray + } + arrayFilePartition += FilePartition.getFilePartitions(SparkSession.getActiveSession.get, + partitionedFiles, + getMaxSplitBytes(SparkSession.getActiveSession.get)).toArray + arrayFilePartition.toArray + } + + private def getMaxSplitBytes(sparkSession: SparkSession): Long = { + val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes + val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes + Math.min(defaultMaxSplitBytes, openCostInBytes) + } + + private def getArrayPartitionedFile(newJobConf: JobConf, + inputFormatClass: Class[newInputClass[Writable, Writable]], + partitionInternalRow: GenericInternalRow): ArrayBuffer[PartitionedFile] = { + val allInputSplits = getInputFormat(newJobConf, inputFormatClass).getSplits(newJobConf, minPartitions) + val inputSplits = if (ignoreEmptySplits) { + allInputSplits.filter(_.getLength > 0) + } else { + allInputSplits + } + inputFormatClass match { + case OrcInputFormat => + case MapredParquetInputFormat => + case _ => + } + val arrayFilePartition = ArrayBuffer[PartitionedFile]() + for (i <- 0 until inputSplits.size) { + val inputSplit = inputSplits(i) + inputSplit match { + case FileSplit => + val orcInputSplit = inputSplit.asInstanceOf[FileSplit] Review Comment: `inputSplit match { case FileSplit => ... }` won’t work: `FileSplit` is a class, not a matchable singleton. Use a typed pattern like `case fs: FileSplit =>` and avoid re-casting the same value immediately after matching. ```suggestion case orcInputSplit: FileSplit => ``` ########## spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHIveTableScanExec.scala: ########## @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive.execution.auron.plan + +import org.apache.auron.metric.SparkMetricNode +import org.apache.auron.{protobuf => pb} +import org.apache.hadoop.conf.Configurable +import org.apache.hadoop.hive.ql.exec.Utilities +import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat +import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat +import org.apache.hadoop.hive.ql.metadata.{Table => HiveTable} +import org.apache.hadoop.hive.ql.plan.TableDesc +import org.apache.hadoop.hive.serde.serdeConstants +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption +import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, StructObjectInspector} +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils +import org.apache.hadoop.io.Writable +import org.apache.hadoop.mapred.{FileSplit, InputFormat, JobConf} +import org.apache.hadoop.mapreduce.{InputFormat => newInputClass} +import org.apache.hadoop.util.ReflectionUtils +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.HADOOP_RDD_IGNORE_EMPTY_SPLITS +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.auron.{NativeRDD, Shims} +import org.apache.spark.sql.catalyst.expressions.{AttributeMap, GenericInternalRow} +import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile} +import org.apache.spark.sql.hive.client.HiveClientImpl +import org.apache.spark.sql.hive.execution.HiveTableScanExec +import org.apache.spark.sql.hive.{HadoopTableReader, HiveShim} +import org.apache.spark.{Partition, TaskContext} + +import java.util.UUID +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +case class NativeHiveTableScanExec(basedHiveScan: HiveTableScanExec) + extends NativeHiveTableScanBase(basedHiveScan) + with Logging { + + @transient private lazy val nativeTable: HiveTable = HiveClientImpl.toHiveTable(relation.tableMeta) + @transient private lazy val fileFormat = HiveTableUtil.getFileFormat(nativeTable.getInputFormatClass) + @transient private lazy val nativeTableDesc = new TableDesc( + nativeTable.getInputFormatClass, + nativeTable.getOutputFormatClass, + nativeTable.getMetadata) + + @transient private lazy val nativeHadoopConf = { + val hiveConf = SparkSession.getActiveSession.get.sessionState.newHadoopConf() + // append columns ids and names before broadcast + val columnOrdinals = AttributeMap(relation.dataCols.zipWithIndex) + val neededColumnIDs = output.flatMap(columnOrdinals.get).map(o => o: Integer) + val neededColumnNames = output.filter(columnOrdinals.contains).map(_.name) + + HiveShim.appendReadColumns(hiveConf, neededColumnIDs, neededColumnNames) + + val deserializer = nativeTableDesc.getDeserializerClass.getConstructor().newInstance() + deserializer.initialize(hiveConf, nativeTableDesc.getProperties) + + // Specifies types and object inspectors of columns to be scanned. + val structOI = ObjectInspectorUtils + .getStandardObjectInspector( + deserializer.getObjectInspector, + ObjectInspectorCopyOption.JAVA) + .asInstanceOf[StructObjectInspector] + + val columnTypeNames = structOI + .getAllStructFieldRefs.asScala + .map(_.getFieldObjectInspector) + .map(TypeInfoUtils.getTypeInfoFromObjectInspector(_).getTypeName) + .mkString(",") + + hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames) + hiveConf.set(serdeConstants.LIST_COLUMNS, relation.dataCols.map(_.name).mkString(",")) + hiveConf + } + + private val minPartitions = if (SparkSession.getActiveSession.get.sparkContext.isLocal) { + 0 // will splitted based on block by default. + } else { + math.max(nativeHadoopConf.getInt("mapreduce.job.maps", 1), + SparkSession.getActiveSession.get.sparkContext.defaultMinPartitions) + } + + private val ignoreEmptySplits = + SparkSession.getActiveSession.get.sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS) + + override val nodeName: String = + s"NativeHiveTableScan $tableName" + + override def doExecuteNative(): NativeRDD = { + val nativeMetrics = SparkMetricNode( + metrics, + Nil, + Some({ + case ("bytes_scanned", v) => + val inputMetric = TaskContext.get.taskMetrics().inputMetrics + inputMetric.incBytesRead(v) + case ("output_rows", v) => + val inputMetric = TaskContext.get.taskMetrics().inputMetrics + inputMetric.incRecordsRead(v) + case _ => + })) + val nativeFileSchema = this.nativeFileSchema + val nativeFileGroups = this.nativeFileGroups + val nativePartitionSchema = this.nativePartitionSchema + + val projection = schema.map(field => relation.schema.fieldIndex(field.name)) + val broadcastedHadoopConf = this.broadcastedHadoopConf + val numPartitions = partitions.length + + new NativeRDD( + sparkContext, + nativeMetrics, + partitions.asInstanceOf[Array[Partition]], + None, + Nil, + rddShuffleReadFull = true, + (partition, _) => { + val resourceId = s"NativeHiveTableScan:${UUID.randomUUID().toString}" + putJniBridgeResource(resourceId, broadcastedHadoopConf) + + val nativeFileGroup = nativeFileGroups(partition.asInstanceOf[FilePartition]) + val nativeFileScanConf = pb.FileScanExecConf + .newBuilder() + .setNumPartitions(numPartitions) + .setPartitionIndex(partition.index) + .setStatistics(pb.Statistics.getDefaultInstance) + .setSchema(nativeFileSchema) + .setFileGroup(nativeFileGroup) + .addAllProjection(projection.map(Integer.valueOf).asJava) + .setPartitionSchema(nativePartitionSchema) + .build() + fileFormat match { + case "orc" => + val nativeOrcScanExecBuilder = pb.OrcScanExecNode + .newBuilder() + .setBaseConf(nativeFileScanConf) + .setFsResourceId(resourceId) + .addAllPruningPredicates(new java.util.ArrayList()) // not support this filter + pb.PhysicalPlanNode + .newBuilder() + .setOrcScan(nativeOrcScanExecBuilder.build()) + .build() + case "parquet" => + val nativeParquetScanExecBuilder = pb.ParquetScanExecNode + .newBuilder() + .setBaseConf(nativeFileScanConf) + .setFsResourceId(resourceId) + .addAllPruningPredicates(new java.util.ArrayList()) // not support this filter + + pb.PhysicalPlanNode + .newBuilder() + .setParquetScan(nativeParquetScanExecBuilder.build()) + .build() + } + }, + friendlyName = "NativeRDD.HiveTableScan") + } + + override def getFilePartitions(): Array[FilePartition] = { + val newJobConf = new JobConf(nativeHadoopConf) + val arrayFilePartition = ArrayBuffer[FilePartition]() + val partitionedFiles = if (relation.isPartitioned) { + val partitions = basedHiveScan.prunedPartitions + val arrayPartitionedFile = ArrayBuffer[PartitionedFile]() + partitions.foreach { partition => + val partDesc = Utilities.getPartitionDescFromTableDesc(nativeTableDesc, partition, true) + val partPath = partition.getDataLocation + HadoopTableReader.initializeLocalJobConfFunc(partPath.toString, nativeTableDesc)(newJobConf) + val partitionValues = partition.getTPartition.getValues + + val partitionInternalRow = new GenericInternalRow(partitionValues.size()) + for (partitionIndex <- 0 until partitionValues.size) { + partitionInternalRow.update(partitionIndex, partitionValues.get(partitionIndex)) + } + + val inputFormatClass = partDesc.getInputFileFormatClass + .asInstanceOf[Class[newInputClass[Writable, Writable]]] + arrayPartitionedFile += getArrayPartitionedFile(newJobConf, inputFormatClass, partitionInternalRow) + } + arrayPartitionedFile + .sortBy(_.length)(implicitly[Ordering[Long]].reverse) + .toArray + } else { + val inputFormatClass = nativeTable.getInputFormatClass.asInstanceOf[Class[newInputClass[Writable, Writable]]] + getArrayPartitionedFile(newJobConf, inputFormatClass, new GenericInternalRow(0)) + .sortBy(_.length)(implicitly[Ordering[Long]].reverse) + .toArray + } + arrayFilePartition += FilePartition.getFilePartitions(SparkSession.getActiveSession.get, + partitionedFiles, + getMaxSplitBytes(SparkSession.getActiveSession.get)).toArray + arrayFilePartition.toArray + } + + private def getMaxSplitBytes(sparkSession: SparkSession): Long = { + val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes + val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes + Math.min(defaultMaxSplitBytes, openCostInBytes) Review Comment: `getMaxSplitBytes` currently returns `min(filesMaxPartitionBytes, filesOpenCostInBytes)`, which can drastically shrink splits and create excessive partitions. Elsewhere in this repo (`NativePaimonTableScanExec`) you fork Spark’s `FilePartition#maxSplitBytes` logic using `min(defaultMaxSplitBytes, max(openCostInBytes, bytesPerCore))`. Align this implementation to that logic (or call the shared helper) to avoid performance regressions. ```suggestion getMaxSplitBytes(SparkSession.getActiveSession.get, partitionedFiles)).toArray arrayFilePartition.toArray } private def getMaxSplitBytes( sparkSession: SparkSession, partitionedFiles: Seq[PartitionedFile]): Long = { val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes val totalBytes = partitionedFiles.map(_.length).sum val parallelism = math.max(1, sparkSession.sparkContext.defaultParallelism) val bytesPerCore = if (totalBytes <= 0L) openCostInBytes else totalBytes / parallelism Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) ``` ########## spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala: ########## @@ -1048,6 +1015,11 @@ object AuronConverters extends Logging { convertToNative(exec) } + def convertHiveTableScanExec(exec: HiveTableScanExec): SparkPlan = { + logDebugPlanConversion(exec) + Review Comment: `convertHiveTableScanExec` has return type `SparkPlan` but currently has no implementation/return value. As written it won’t compile; either implement the conversion (or delegate to the external provider) or remove the unused stub until it’s ready. ```suggestion convertToNative(exec) ``` ########## spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala: ########## @@ -16,69 +16,39 @@ */ package org.apache.spark.sql.auron -import java.util.ServiceLoader - -import scala.annotation.tailrec -import scala.collection.JavaConverters._ -import scala.collection.mutable - +import org.apache.auron.configuration.AuronConfiguration +import org.apache.auron.jni.AuronAdaptor +import org.apache.auron.metric.SparkMetricNode +import org.apache.auron.protobuf.{EmptyPartitionsExecNode, PhysicalPlanNode} +import org.apache.auron.spark.configuration.SparkAuronConfiguration +import org.apache.auron.sparkver import org.apache.commons.lang3.reflect.MethodUtils import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat import org.apache.spark.Partition import org.apache.spark.broadcast.Broadcast -import org.apache.spark.internal.{config, Logging} -import org.apache.spark.sql.auron.AuronConvertStrategy.{childOrderingRequiredTag, convertibleTag, convertStrategyTag, convertToNonNativeTag, isNeverConvert, joinSmallerSideTag, neverConvertReasonTag} -import org.apache.spark.sql.auron.NativeConverters.{existTimestampType, isTypeSupported, roundRobinTypeSupported, StubExpr} +import org.apache.spark.internal.{Logging, config} +import org.apache.spark.sql.auron.AuronConvertStrategy._ +import org.apache.spark.sql.auron.NativeConverters.{StubExpr, existTimestampType, isTypeSupported, roundRobinTypeSupported} import org.apache.spark.sql.auron.join.JoinBuildSides.{JoinBuildLeft, JoinBuildRight, JoinBuildSide} import org.apache.spark.sql.auron.util.AuronLogUtils.logDebugPlanConversion -import org.apache.spark.sql.catalyst.expressions.AggregateWindowFunction -import org.apache.spark.sql.catalyst.expressions.Alias -import org.apache.spark.sql.catalyst.expressions.Ascending -import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.sql.catalyst.expressions.Literal -import org.apache.spark.sql.catalyst.expressions.NamedExpression -import org.apache.spark.sql.catalyst.expressions.SortOrder -import org.apache.spark.sql.catalyst.expressions.WindowExpression -import org.apache.spark.sql.catalyst.expressions.WindowSpecDefinition -import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression -import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction -import org.apache.spark.sql.catalyst.expressions.aggregate.Final -import org.apache.spark.sql.catalyst.expressions.aggregate.Partial -import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning -import org.apache.spark.sql.catalyst.plans.physical.Partitioning -import org.apache.spark.sql.catalyst.plans.physical.RangePartitioning -import org.apache.spark.sql.catalyst.plans.physical.RoundRobinPartitioning +import org.apache.spark.sql.catalyst.expressions.{AggregateWindowFunction, Alias, Ascending, Attribute, AttributeReference, Expression, Literal, NamedExpression, SortOrder, WindowExpression, WindowSpecDefinition} +import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, AggregateFunction, Final, Partial} +import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning, RoundRobinPartitioning} import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.aggregate.HashAggregateExec -import org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec -import org.apache.spark.sql.execution.aggregate.SortAggregateExec -import org.apache.spark.sql.execution.auron.plan.ConvertToNativeBase -import org.apache.spark.sql.execution.auron.plan.NativeAggBase -import org.apache.spark.sql.execution.auron.plan.NativeBroadcastExchangeBase -import org.apache.spark.sql.execution.auron.plan.NativeOrcScanBase -import org.apache.spark.sql.execution.auron.plan.NativeParquetScanBase -import org.apache.spark.sql.execution.auron.plan.NativeSortBase -import org.apache.spark.sql.execution.auron.plan.NativeUnionBase -import org.apache.spark.sql.execution.auron.plan.Util +import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec} +import org.apache.spark.sql.execution.auron.plan._ import org.apache.spark.sql.execution.command.DataWritingCommandExec -import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec -import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec +import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.execution.window.WindowExec import org.apache.spark.sql.hive.execution.InsertIntoHiveTable import org.apache.spark.sql.hive.execution.auron.plan.NativeHiveTableScanBase import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.LongType -import org.apache.auron.configuration.AuronConfiguration -import org.apache.auron.jni.AuronAdaptor -import org.apache.auron.metric.SparkMetricNode -import org.apache.auron.protobuf.EmptyPartitionsExecNode -import org.apache.auron.protobuf.PhysicalPlanNode -import org.apache.auron.spark.configuration.SparkAuronConfiguration -import org.apache.auron.sparkver +import java.util.ServiceLoader +import scala.annotation.tailrec Review Comment: `scala.collection.mutable` import was removed, but this file still references `mutable.LinkedHashMap` later (e.g., around the partial-agg projection logic). This will not compile unless `scala.collection.mutable` (or the specific types used) are re-imported or fully qualified. ```suggestion import scala.annotation.tailrec import scala.collection.mutable ``` ########## spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHIveTableScanExec.scala: ########## @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive.execution.auron.plan + +import org.apache.auron.metric.SparkMetricNode +import org.apache.auron.{protobuf => pb} +import org.apache.hadoop.conf.Configurable +import org.apache.hadoop.hive.ql.exec.Utilities +import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat +import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat +import org.apache.hadoop.hive.ql.metadata.{Table => HiveTable} +import org.apache.hadoop.hive.ql.plan.TableDesc +import org.apache.hadoop.hive.serde.serdeConstants +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption +import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, StructObjectInspector} +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils +import org.apache.hadoop.io.Writable +import org.apache.hadoop.mapred.{FileSplit, InputFormat, JobConf} +import org.apache.hadoop.mapreduce.{InputFormat => newInputClass} +import org.apache.hadoop.util.ReflectionUtils +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.HADOOP_RDD_IGNORE_EMPTY_SPLITS +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.auron.{NativeRDD, Shims} +import org.apache.spark.sql.catalyst.expressions.{AttributeMap, GenericInternalRow} +import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile} +import org.apache.spark.sql.hive.client.HiveClientImpl +import org.apache.spark.sql.hive.execution.HiveTableScanExec +import org.apache.spark.sql.hive.{HadoopTableReader, HiveShim} +import org.apache.spark.{Partition, TaskContext} + +import java.util.UUID +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +case class NativeHiveTableScanExec(basedHiveScan: HiveTableScanExec) + extends NativeHiveTableScanBase(basedHiveScan) + with Logging { + + @transient private lazy val nativeTable: HiveTable = HiveClientImpl.toHiveTable(relation.tableMeta) + @transient private lazy val fileFormat = HiveTableUtil.getFileFormat(nativeTable.getInputFormatClass) + @transient private lazy val nativeTableDesc = new TableDesc( + nativeTable.getInputFormatClass, + nativeTable.getOutputFormatClass, + nativeTable.getMetadata) + + @transient private lazy val nativeHadoopConf = { + val hiveConf = SparkSession.getActiveSession.get.sessionState.newHadoopConf() + // append columns ids and names before broadcast + val columnOrdinals = AttributeMap(relation.dataCols.zipWithIndex) + val neededColumnIDs = output.flatMap(columnOrdinals.get).map(o => o: Integer) + val neededColumnNames = output.filter(columnOrdinals.contains).map(_.name) + + HiveShim.appendReadColumns(hiveConf, neededColumnIDs, neededColumnNames) + + val deserializer = nativeTableDesc.getDeserializerClass.getConstructor().newInstance() + deserializer.initialize(hiveConf, nativeTableDesc.getProperties) + + // Specifies types and object inspectors of columns to be scanned. + val structOI = ObjectInspectorUtils + .getStandardObjectInspector( + deserializer.getObjectInspector, + ObjectInspectorCopyOption.JAVA) + .asInstanceOf[StructObjectInspector] + + val columnTypeNames = structOI + .getAllStructFieldRefs.asScala + .map(_.getFieldObjectInspector) + .map(TypeInfoUtils.getTypeInfoFromObjectInspector(_).getTypeName) + .mkString(",") + + hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames) + hiveConf.set(serdeConstants.LIST_COLUMNS, relation.dataCols.map(_.name).mkString(",")) + hiveConf + } + + private val minPartitions = if (SparkSession.getActiveSession.get.sparkContext.isLocal) { + 0 // will splitted based on block by default. + } else { + math.max(nativeHadoopConf.getInt("mapreduce.job.maps", 1), + SparkSession.getActiveSession.get.sparkContext.defaultMinPartitions) Review Comment: `minPartitions` reads `SparkSession.getActiveSession.get.sparkContext` multiple times. Besides the `.get` risk, it’s also inconsistent with other native scan implementations in this repo that pass an explicit `sparkSession` around. Prefer using a single `sparkSession` resolved from `basedHiveScan` and derive `sparkContext` from it. ########## spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHIveTableScanExec.scala: ########## @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive.execution.auron.plan + +import org.apache.auron.metric.SparkMetricNode +import org.apache.auron.{protobuf => pb} Review Comment: File name `NativeHIveTableScanExec.scala` has inconsistent casing ("HIve") compared to the class `NativeHiveTableScanExec`. On case-sensitive filesystems this is still valid but is easy to miss/grep incorrectly; consider renaming the file to `NativeHiveTableScanExec.scala` for consistency. ########## spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHIveTableScanExec.scala: ########## @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive.execution.auron.plan + +import org.apache.auron.metric.SparkMetricNode +import org.apache.auron.{protobuf => pb} +import org.apache.hadoop.conf.Configurable +import org.apache.hadoop.hive.ql.exec.Utilities +import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat +import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat +import org.apache.hadoop.hive.ql.metadata.{Table => HiveTable} +import org.apache.hadoop.hive.ql.plan.TableDesc +import org.apache.hadoop.hive.serde.serdeConstants +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption +import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, StructObjectInspector} +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils +import org.apache.hadoop.io.Writable +import org.apache.hadoop.mapred.{FileSplit, InputFormat, JobConf} +import org.apache.hadoop.mapreduce.{InputFormat => newInputClass} +import org.apache.hadoop.util.ReflectionUtils +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.HADOOP_RDD_IGNORE_EMPTY_SPLITS +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.auron.{NativeRDD, Shims} +import org.apache.spark.sql.catalyst.expressions.{AttributeMap, GenericInternalRow} +import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile} +import org.apache.spark.sql.hive.client.HiveClientImpl +import org.apache.spark.sql.hive.execution.HiveTableScanExec +import org.apache.spark.sql.hive.{HadoopTableReader, HiveShim} +import org.apache.spark.{Partition, TaskContext} + +import java.util.UUID +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +case class NativeHiveTableScanExec(basedHiveScan: HiveTableScanExec) + extends NativeHiveTableScanBase(basedHiveScan) + with Logging { + + @transient private lazy val nativeTable: HiveTable = HiveClientImpl.toHiveTable(relation.tableMeta) + @transient private lazy val fileFormat = HiveTableUtil.getFileFormat(nativeTable.getInputFormatClass) + @transient private lazy val nativeTableDesc = new TableDesc( + nativeTable.getInputFormatClass, + nativeTable.getOutputFormatClass, + nativeTable.getMetadata) + + @transient private lazy val nativeHadoopConf = { + val hiveConf = SparkSession.getActiveSession.get.sessionState.newHadoopConf() + // append columns ids and names before broadcast + val columnOrdinals = AttributeMap(relation.dataCols.zipWithIndex) + val neededColumnIDs = output.flatMap(columnOrdinals.get).map(o => o: Integer) + val neededColumnNames = output.filter(columnOrdinals.contains).map(_.name) + + HiveShim.appendReadColumns(hiveConf, neededColumnIDs, neededColumnNames) + + val deserializer = nativeTableDesc.getDeserializerClass.getConstructor().newInstance() + deserializer.initialize(hiveConf, nativeTableDesc.getProperties) + + // Specifies types and object inspectors of columns to be scanned. + val structOI = ObjectInspectorUtils + .getStandardObjectInspector( + deserializer.getObjectInspector, + ObjectInspectorCopyOption.JAVA) + .asInstanceOf[StructObjectInspector] + + val columnTypeNames = structOI + .getAllStructFieldRefs.asScala + .map(_.getFieldObjectInspector) + .map(TypeInfoUtils.getTypeInfoFromObjectInspector(_).getTypeName) + .mkString(",") + + hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames) + hiveConf.set(serdeConstants.LIST_COLUMNS, relation.dataCols.map(_.name).mkString(",")) + hiveConf + } + + private val minPartitions = if (SparkSession.getActiveSession.get.sparkContext.isLocal) { + 0 // will splitted based on block by default. + } else { + math.max(nativeHadoopConf.getInt("mapreduce.job.maps", 1), + SparkSession.getActiveSession.get.sparkContext.defaultMinPartitions) + } + + private val ignoreEmptySplits = + SparkSession.getActiveSession.get.sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS) + + override val nodeName: String = + s"NativeHiveTableScan $tableName" + + override def doExecuteNative(): NativeRDD = { + val nativeMetrics = SparkMetricNode( + metrics, + Nil, + Some({ + case ("bytes_scanned", v) => + val inputMetric = TaskContext.get.taskMetrics().inputMetrics + inputMetric.incBytesRead(v) + case ("output_rows", v) => + val inputMetric = TaskContext.get.taskMetrics().inputMetrics + inputMetric.incRecordsRead(v) + case _ => + })) + val nativeFileSchema = this.nativeFileSchema + val nativeFileGroups = this.nativeFileGroups + val nativePartitionSchema = this.nativePartitionSchema + + val projection = schema.map(field => relation.schema.fieldIndex(field.name)) + val broadcastedHadoopConf = this.broadcastedHadoopConf + val numPartitions = partitions.length + + new NativeRDD( + sparkContext, + nativeMetrics, + partitions.asInstanceOf[Array[Partition]], + None, + Nil, + rddShuffleReadFull = true, + (partition, _) => { + val resourceId = s"NativeHiveTableScan:${UUID.randomUUID().toString}" + putJniBridgeResource(resourceId, broadcastedHadoopConf) + + val nativeFileGroup = nativeFileGroups(partition.asInstanceOf[FilePartition]) + val nativeFileScanConf = pb.FileScanExecConf + .newBuilder() + .setNumPartitions(numPartitions) + .setPartitionIndex(partition.index) + .setStatistics(pb.Statistics.getDefaultInstance) + .setSchema(nativeFileSchema) + .setFileGroup(nativeFileGroup) + .addAllProjection(projection.map(Integer.valueOf).asJava) + .setPartitionSchema(nativePartitionSchema) + .build() + fileFormat match { + case "orc" => + val nativeOrcScanExecBuilder = pb.OrcScanExecNode + .newBuilder() + .setBaseConf(nativeFileScanConf) + .setFsResourceId(resourceId) + .addAllPruningPredicates(new java.util.ArrayList()) // not support this filter + pb.PhysicalPlanNode + .newBuilder() + .setOrcScan(nativeOrcScanExecBuilder.build()) + .build() + case "parquet" => + val nativeParquetScanExecBuilder = pb.ParquetScanExecNode + .newBuilder() + .setBaseConf(nativeFileScanConf) + .setFsResourceId(resourceId) + .addAllPruningPredicates(new java.util.ArrayList()) // not support this filter + + pb.PhysicalPlanNode + .newBuilder() + .setParquetScan(nativeParquetScanExecBuilder.build()) + .build() + } + }, + friendlyName = "NativeRDD.HiveTableScan") + } + + override def getFilePartitions(): Array[FilePartition] = { + val newJobConf = new JobConf(nativeHadoopConf) + val arrayFilePartition = ArrayBuffer[FilePartition]() + val partitionedFiles = if (relation.isPartitioned) { + val partitions = basedHiveScan.prunedPartitions + val arrayPartitionedFile = ArrayBuffer[PartitionedFile]() + partitions.foreach { partition => + val partDesc = Utilities.getPartitionDescFromTableDesc(nativeTableDesc, partition, true) + val partPath = partition.getDataLocation + HadoopTableReader.initializeLocalJobConfFunc(partPath.toString, nativeTableDesc)(newJobConf) + val partitionValues = partition.getTPartition.getValues + + val partitionInternalRow = new GenericInternalRow(partitionValues.size()) + for (partitionIndex <- 0 until partitionValues.size) { + partitionInternalRow.update(partitionIndex, partitionValues.get(partitionIndex)) + } + + val inputFormatClass = partDesc.getInputFileFormatClass + .asInstanceOf[Class[newInputClass[Writable, Writable]]] + arrayPartitionedFile += getArrayPartitionedFile(newJobConf, inputFormatClass, partitionInternalRow) + } + arrayPartitionedFile + .sortBy(_.length)(implicitly[Ordering[Long]].reverse) + .toArray + } else { + val inputFormatClass = nativeTable.getInputFormatClass.asInstanceOf[Class[newInputClass[Writable, Writable]]] + getArrayPartitionedFile(newJobConf, inputFormatClass, new GenericInternalRow(0)) + .sortBy(_.length)(implicitly[Ordering[Long]].reverse) + .toArray + } + arrayFilePartition += FilePartition.getFilePartitions(SparkSession.getActiveSession.get, + partitionedFiles, + getMaxSplitBytes(SparkSession.getActiveSession.get)).toArray + arrayFilePartition.toArray + } + + private def getMaxSplitBytes(sparkSession: SparkSession): Long = { + val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes + val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes + Math.min(defaultMaxSplitBytes, openCostInBytes) + } + + private def getArrayPartitionedFile(newJobConf: JobConf, + inputFormatClass: Class[newInputClass[Writable, Writable]], + partitionInternalRow: GenericInternalRow): ArrayBuffer[PartitionedFile] = { + val allInputSplits = getInputFormat(newJobConf, inputFormatClass).getSplits(newJobConf, minPartitions) + val inputSplits = if (ignoreEmptySplits) { + allInputSplits.filter(_.getLength > 0) + } else { + allInputSplits + } + inputFormatClass match { + case OrcInputFormat => + case MapredParquetInputFormat => + case _ => + } + val arrayFilePartition = ArrayBuffer[PartitionedFile]() + for (i <- 0 until inputSplits.size) { + val inputSplit = inputSplits(i) + inputSplit match { + case FileSplit => + val orcInputSplit = inputSplit.asInstanceOf[FileSplit] + arrayFilePartition += + Shims.get.getPartitionedFile(partitionInternalRow, orcInputSplit.getPath.toString, + orcInputSplit.getStart, orcInputSplit.getLength) + } + } + arrayFilePartition + } + + private def getInputFormat(conf: JobConf, inputFormatClass: Class[newInputClass[Writable, Writable]]): + InputFormat[Writable, Writable] = { + val newInputFormat = ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf) + .asInstanceOf[InputFormat[Writable, Writable]] + newInputFormat match { Review Comment: `getInputFormat` takes a `mapreduce.InputFormat` class (`newInputClass`) but returns/instantiates `org.apache.hadoop.mapred.InputFormat`. This signature mismatch makes the unchecked cast even riskier. Align the parameter type with the returned InputFormat type (or vice versa) so the compiler can help enforce correctness. ########## spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHIveTableScanExec.scala: ########## @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive.execution.auron.plan + +import org.apache.auron.metric.SparkMetricNode +import org.apache.auron.{protobuf => pb} +import org.apache.hadoop.conf.Configurable +import org.apache.hadoop.hive.ql.exec.Utilities +import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat +import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat +import org.apache.hadoop.hive.ql.metadata.{Table => HiveTable} +import org.apache.hadoop.hive.ql.plan.TableDesc +import org.apache.hadoop.hive.serde.serdeConstants +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption +import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, StructObjectInspector} +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils +import org.apache.hadoop.io.Writable +import org.apache.hadoop.mapred.{FileSplit, InputFormat, JobConf} +import org.apache.hadoop.mapreduce.{InputFormat => newInputClass} +import org.apache.hadoop.util.ReflectionUtils +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.HADOOP_RDD_IGNORE_EMPTY_SPLITS +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.auron.{NativeRDD, Shims} +import org.apache.spark.sql.catalyst.expressions.{AttributeMap, GenericInternalRow} +import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile} +import org.apache.spark.sql.hive.client.HiveClientImpl +import org.apache.spark.sql.hive.execution.HiveTableScanExec +import org.apache.spark.sql.hive.{HadoopTableReader, HiveShim} +import org.apache.spark.{Partition, TaskContext} + +import java.util.UUID +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +case class NativeHiveTableScanExec(basedHiveScan: HiveTableScanExec) + extends NativeHiveTableScanBase(basedHiveScan) + with Logging { + + @transient private lazy val nativeTable: HiveTable = HiveClientImpl.toHiveTable(relation.tableMeta) + @transient private lazy val fileFormat = HiveTableUtil.getFileFormat(nativeTable.getInputFormatClass) + @transient private lazy val nativeTableDesc = new TableDesc( + nativeTable.getInputFormatClass, + nativeTable.getOutputFormatClass, + nativeTable.getMetadata) + + @transient private lazy val nativeHadoopConf = { + val hiveConf = SparkSession.getActiveSession.get.sessionState.newHadoopConf() + // append columns ids and names before broadcast + val columnOrdinals = AttributeMap(relation.dataCols.zipWithIndex) + val neededColumnIDs = output.flatMap(columnOrdinals.get).map(o => o: Integer) + val neededColumnNames = output.filter(columnOrdinals.contains).map(_.name) + + HiveShim.appendReadColumns(hiveConf, neededColumnIDs, neededColumnNames) + + val deserializer = nativeTableDesc.getDeserializerClass.getConstructor().newInstance() + deserializer.initialize(hiveConf, nativeTableDesc.getProperties) + + // Specifies types and object inspectors of columns to be scanned. + val structOI = ObjectInspectorUtils + .getStandardObjectInspector( + deserializer.getObjectInspector, + ObjectInspectorCopyOption.JAVA) + .asInstanceOf[StructObjectInspector] + + val columnTypeNames = structOI + .getAllStructFieldRefs.asScala + .map(_.getFieldObjectInspector) + .map(TypeInfoUtils.getTypeInfoFromObjectInspector(_).getTypeName) + .mkString(",") + + hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames) + hiveConf.set(serdeConstants.LIST_COLUMNS, relation.dataCols.map(_.name).mkString(",")) + hiveConf + } + + private val minPartitions = if (SparkSession.getActiveSession.get.sparkContext.isLocal) { + 0 // will splitted based on block by default. + } else { + math.max(nativeHadoopConf.getInt("mapreduce.job.maps", 1), + SparkSession.getActiveSession.get.sparkContext.defaultMinPartitions) + } + + private val ignoreEmptySplits = + SparkSession.getActiveSession.get.sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS) + + override val nodeName: String = + s"NativeHiveTableScan $tableName" + + override def doExecuteNative(): NativeRDD = { + val nativeMetrics = SparkMetricNode( + metrics, + Nil, + Some({ + case ("bytes_scanned", v) => + val inputMetric = TaskContext.get.taskMetrics().inputMetrics + inputMetric.incBytesRead(v) + case ("output_rows", v) => + val inputMetric = TaskContext.get.taskMetrics().inputMetrics + inputMetric.incRecordsRead(v) + case _ => + })) + val nativeFileSchema = this.nativeFileSchema + val nativeFileGroups = this.nativeFileGroups + val nativePartitionSchema = this.nativePartitionSchema + + val projection = schema.map(field => relation.schema.fieldIndex(field.name)) + val broadcastedHadoopConf = this.broadcastedHadoopConf + val numPartitions = partitions.length + + new NativeRDD( + sparkContext, + nativeMetrics, + partitions.asInstanceOf[Array[Partition]], + None, + Nil, + rddShuffleReadFull = true, + (partition, _) => { + val resourceId = s"NativeHiveTableScan:${UUID.randomUUID().toString}" + putJniBridgeResource(resourceId, broadcastedHadoopConf) + + val nativeFileGroup = nativeFileGroups(partition.asInstanceOf[FilePartition]) + val nativeFileScanConf = pb.FileScanExecConf + .newBuilder() + .setNumPartitions(numPartitions) + .setPartitionIndex(partition.index) + .setStatistics(pb.Statistics.getDefaultInstance) + .setSchema(nativeFileSchema) + .setFileGroup(nativeFileGroup) + .addAllProjection(projection.map(Integer.valueOf).asJava) + .setPartitionSchema(nativePartitionSchema) + .build() + fileFormat match { + case "orc" => + val nativeOrcScanExecBuilder = pb.OrcScanExecNode + .newBuilder() + .setBaseConf(nativeFileScanConf) + .setFsResourceId(resourceId) + .addAllPruningPredicates(new java.util.ArrayList()) // not support this filter + pb.PhysicalPlanNode + .newBuilder() + .setOrcScan(nativeOrcScanExecBuilder.build()) + .build() + case "parquet" => + val nativeParquetScanExecBuilder = pb.ParquetScanExecNode + .newBuilder() + .setBaseConf(nativeFileScanConf) + .setFsResourceId(resourceId) + .addAllPruningPredicates(new java.util.ArrayList()) // not support this filter + + pb.PhysicalPlanNode + .newBuilder() + .setParquetScan(nativeParquetScanExecBuilder.build()) + .build() + } + }, + friendlyName = "NativeRDD.HiveTableScan") + } + + override def getFilePartitions(): Array[FilePartition] = { + val newJobConf = new JobConf(nativeHadoopConf) + val arrayFilePartition = ArrayBuffer[FilePartition]() + val partitionedFiles = if (relation.isPartitioned) { + val partitions = basedHiveScan.prunedPartitions + val arrayPartitionedFile = ArrayBuffer[PartitionedFile]() + partitions.foreach { partition => + val partDesc = Utilities.getPartitionDescFromTableDesc(nativeTableDesc, partition, true) + val partPath = partition.getDataLocation + HadoopTableReader.initializeLocalJobConfFunc(partPath.toString, nativeTableDesc)(newJobConf) + val partitionValues = partition.getTPartition.getValues + + val partitionInternalRow = new GenericInternalRow(partitionValues.size()) + for (partitionIndex <- 0 until partitionValues.size) { + partitionInternalRow.update(partitionIndex, partitionValues.get(partitionIndex)) + } Review Comment: Partition values are populated into `GenericInternalRow` using raw strings from Hive metastore (`partition.getTPartition.getValues`). Spark scan planning usually casts these strings to the partition schema types (and handles DEFAULT_PARTITION_NAME -> null / time zones). Without that casting, partition values/types may be incorrect. Consider building the partition `InternalRow` via the same cast/toRow approach used in `NativePaimonTableScanExec` (or existing Spark/Hive utilities). ########## spark-extension-shims-spark/pom.xml: ########## @@ -36,6 +36,15 @@ <artifactId>auron-spark-ui_${scalaVersion}</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-hive_${scalaVersion}</artifactId> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-catalyst_${scalaVersion}</artifactId> + <scope>provided</scope> + </dependency> Review Comment: `spark-hive_${scalaVersion}` is declared twice: once with default scope (compile) and again with `<scope>provided</scope>`. This can unintentionally bundle Spark Hive classes into this module’s artifact and/or cause dependency resolution conflicts. Keep a single dependency entry with the intended scope. ########## spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHIveTableScanExec.scala: ########## @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive.execution.auron.plan + +import org.apache.auron.metric.SparkMetricNode +import org.apache.auron.{protobuf => pb} +import org.apache.hadoop.conf.Configurable +import org.apache.hadoop.hive.ql.exec.Utilities +import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat +import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat +import org.apache.hadoop.hive.ql.metadata.{Table => HiveTable} +import org.apache.hadoop.hive.ql.plan.TableDesc +import org.apache.hadoop.hive.serde.serdeConstants +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption +import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, StructObjectInspector} +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils +import org.apache.hadoop.io.Writable +import org.apache.hadoop.mapred.{FileSplit, InputFormat, JobConf} +import org.apache.hadoop.mapreduce.{InputFormat => newInputClass} +import org.apache.hadoop.util.ReflectionUtils +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.HADOOP_RDD_IGNORE_EMPTY_SPLITS +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.auron.{NativeRDD, Shims} +import org.apache.spark.sql.catalyst.expressions.{AttributeMap, GenericInternalRow} +import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile} +import org.apache.spark.sql.hive.client.HiveClientImpl +import org.apache.spark.sql.hive.execution.HiveTableScanExec +import org.apache.spark.sql.hive.{HadoopTableReader, HiveShim} +import org.apache.spark.{Partition, TaskContext} + +import java.util.UUID +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +case class NativeHiveTableScanExec(basedHiveScan: HiveTableScanExec) + extends NativeHiveTableScanBase(basedHiveScan) + with Logging { + + @transient private lazy val nativeTable: HiveTable = HiveClientImpl.toHiveTable(relation.tableMeta) + @transient private lazy val fileFormat = HiveTableUtil.getFileFormat(nativeTable.getInputFormatClass) + @transient private lazy val nativeTableDesc = new TableDesc( + nativeTable.getInputFormatClass, + nativeTable.getOutputFormatClass, + nativeTable.getMetadata) + + @transient private lazy val nativeHadoopConf = { + val hiveConf = SparkSession.getActiveSession.get.sessionState.newHadoopConf() + // append columns ids and names before broadcast + val columnOrdinals = AttributeMap(relation.dataCols.zipWithIndex) + val neededColumnIDs = output.flatMap(columnOrdinals.get).map(o => o: Integer) + val neededColumnNames = output.filter(columnOrdinals.contains).map(_.name) + + HiveShim.appendReadColumns(hiveConf, neededColumnIDs, neededColumnNames) + + val deserializer = nativeTableDesc.getDeserializerClass.getConstructor().newInstance() + deserializer.initialize(hiveConf, nativeTableDesc.getProperties) + + // Specifies types and object inspectors of columns to be scanned. + val structOI = ObjectInspectorUtils + .getStandardObjectInspector( + deserializer.getObjectInspector, + ObjectInspectorCopyOption.JAVA) + .asInstanceOf[StructObjectInspector] + + val columnTypeNames = structOI + .getAllStructFieldRefs.asScala + .map(_.getFieldObjectInspector) + .map(TypeInfoUtils.getTypeInfoFromObjectInspector(_).getTypeName) + .mkString(",") + + hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames) + hiveConf.set(serdeConstants.LIST_COLUMNS, relation.dataCols.map(_.name).mkString(",")) + hiveConf + } + + private val minPartitions = if (SparkSession.getActiveSession.get.sparkContext.isLocal) { + 0 // will splitted based on block by default. + } else { + math.max(nativeHadoopConf.getInt("mapreduce.job.maps", 1), + SparkSession.getActiveSession.get.sparkContext.defaultMinPartitions) + } + + private val ignoreEmptySplits = + SparkSession.getActiveSession.get.sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS) + + override val nodeName: String = + s"NativeHiveTableScan $tableName" + + override def doExecuteNative(): NativeRDD = { + val nativeMetrics = SparkMetricNode( + metrics, + Nil, + Some({ + case ("bytes_scanned", v) => + val inputMetric = TaskContext.get.taskMetrics().inputMetrics + inputMetric.incBytesRead(v) + case ("output_rows", v) => + val inputMetric = TaskContext.get.taskMetrics().inputMetrics + inputMetric.incRecordsRead(v) + case _ => + })) + val nativeFileSchema = this.nativeFileSchema + val nativeFileGroups = this.nativeFileGroups + val nativePartitionSchema = this.nativePartitionSchema + + val projection = schema.map(field => relation.schema.fieldIndex(field.name)) + val broadcastedHadoopConf = this.broadcastedHadoopConf + val numPartitions = partitions.length + + new NativeRDD( + sparkContext, + nativeMetrics, + partitions.asInstanceOf[Array[Partition]], + None, + Nil, + rddShuffleReadFull = true, + (partition, _) => { + val resourceId = s"NativeHiveTableScan:${UUID.randomUUID().toString}" + putJniBridgeResource(resourceId, broadcastedHadoopConf) + + val nativeFileGroup = nativeFileGroups(partition.asInstanceOf[FilePartition]) + val nativeFileScanConf = pb.FileScanExecConf + .newBuilder() + .setNumPartitions(numPartitions) + .setPartitionIndex(partition.index) + .setStatistics(pb.Statistics.getDefaultInstance) + .setSchema(nativeFileSchema) + .setFileGroup(nativeFileGroup) + .addAllProjection(projection.map(Integer.valueOf).asJava) + .setPartitionSchema(nativePartitionSchema) + .build() + fileFormat match { + case "orc" => + val nativeOrcScanExecBuilder = pb.OrcScanExecNode + .newBuilder() + .setBaseConf(nativeFileScanConf) + .setFsResourceId(resourceId) + .addAllPruningPredicates(new java.util.ArrayList()) // not support this filter + pb.PhysicalPlanNode + .newBuilder() + .setOrcScan(nativeOrcScanExecBuilder.build()) + .build() + case "parquet" => + val nativeParquetScanExecBuilder = pb.ParquetScanExecNode + .newBuilder() + .setBaseConf(nativeFileScanConf) + .setFsResourceId(resourceId) + .addAllPruningPredicates(new java.util.ArrayList()) // not support this filter + + pb.PhysicalPlanNode + .newBuilder() + .setParquetScan(nativeParquetScanExecBuilder.build()) + .build() + } + }, + friendlyName = "NativeRDD.HiveTableScan") + } + + override def getFilePartitions(): Array[FilePartition] = { + val newJobConf = new JobConf(nativeHadoopConf) + val arrayFilePartition = ArrayBuffer[FilePartition]() + val partitionedFiles = if (relation.isPartitioned) { + val partitions = basedHiveScan.prunedPartitions + val arrayPartitionedFile = ArrayBuffer[PartitionedFile]() + partitions.foreach { partition => + val partDesc = Utilities.getPartitionDescFromTableDesc(nativeTableDesc, partition, true) + val partPath = partition.getDataLocation + HadoopTableReader.initializeLocalJobConfFunc(partPath.toString, nativeTableDesc)(newJobConf) + val partitionValues = partition.getTPartition.getValues + + val partitionInternalRow = new GenericInternalRow(partitionValues.size()) + for (partitionIndex <- 0 until partitionValues.size) { + partitionInternalRow.update(partitionIndex, partitionValues.get(partitionIndex)) + } + + val inputFormatClass = partDesc.getInputFileFormatClass + .asInstanceOf[Class[newInputClass[Writable, Writable]]] + arrayPartitionedFile += getArrayPartitionedFile(newJobConf, inputFormatClass, partitionInternalRow) + } Review Comment: In `getFilePartitions`, `arrayPartitionedFile` is an `ArrayBuffer[PartitionedFile]`, but the code uses `+= getArrayPartitionedFile(...)` where `getArrayPartitionedFile` returns an `ArrayBuffer[PartitionedFile]`. This is a type mismatch and won’t compile; use `++=` (or change the helper to return a single `PartitionedFile`). ########## spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHIveTableScanExec.scala: ########## @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive.execution.auron.plan + +import org.apache.auron.metric.SparkMetricNode +import org.apache.auron.{protobuf => pb} +import org.apache.hadoop.conf.Configurable +import org.apache.hadoop.hive.ql.exec.Utilities +import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat +import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat +import org.apache.hadoop.hive.ql.metadata.{Table => HiveTable} +import org.apache.hadoop.hive.ql.plan.TableDesc +import org.apache.hadoop.hive.serde.serdeConstants +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption +import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, StructObjectInspector} +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils +import org.apache.hadoop.io.Writable +import org.apache.hadoop.mapred.{FileSplit, InputFormat, JobConf} +import org.apache.hadoop.mapreduce.{InputFormat => newInputClass} +import org.apache.hadoop.util.ReflectionUtils +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.HADOOP_RDD_IGNORE_EMPTY_SPLITS +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.auron.{NativeRDD, Shims} +import org.apache.spark.sql.catalyst.expressions.{AttributeMap, GenericInternalRow} +import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile} +import org.apache.spark.sql.hive.client.HiveClientImpl +import org.apache.spark.sql.hive.execution.HiveTableScanExec +import org.apache.spark.sql.hive.{HadoopTableReader, HiveShim} +import org.apache.spark.{Partition, TaskContext} + +import java.util.UUID +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +case class NativeHiveTableScanExec(basedHiveScan: HiveTableScanExec) + extends NativeHiveTableScanBase(basedHiveScan) + with Logging { + + @transient private lazy val nativeTable: HiveTable = HiveClientImpl.toHiveTable(relation.tableMeta) + @transient private lazy val fileFormat = HiveTableUtil.getFileFormat(nativeTable.getInputFormatClass) + @transient private lazy val nativeTableDesc = new TableDesc( + nativeTable.getInputFormatClass, + nativeTable.getOutputFormatClass, + nativeTable.getMetadata) + + @transient private lazy val nativeHadoopConf = { + val hiveConf = SparkSession.getActiveSession.get.sessionState.newHadoopConf() + // append columns ids and names before broadcast + val columnOrdinals = AttributeMap(relation.dataCols.zipWithIndex) + val neededColumnIDs = output.flatMap(columnOrdinals.get).map(o => o: Integer) + val neededColumnNames = output.filter(columnOrdinals.contains).map(_.name) + + HiveShim.appendReadColumns(hiveConf, neededColumnIDs, neededColumnNames) + + val deserializer = nativeTableDesc.getDeserializerClass.getConstructor().newInstance() + deserializer.initialize(hiveConf, nativeTableDesc.getProperties) + + // Specifies types and object inspectors of columns to be scanned. + val structOI = ObjectInspectorUtils + .getStandardObjectInspector( + deserializer.getObjectInspector, + ObjectInspectorCopyOption.JAVA) + .asInstanceOf[StructObjectInspector] + + val columnTypeNames = structOI + .getAllStructFieldRefs.asScala + .map(_.getFieldObjectInspector) + .map(TypeInfoUtils.getTypeInfoFromObjectInspector(_).getTypeName) + .mkString(",") + + hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames) + hiveConf.set(serdeConstants.LIST_COLUMNS, relation.dataCols.map(_.name).mkString(",")) + hiveConf + } + + private val minPartitions = if (SparkSession.getActiveSession.get.sparkContext.isLocal) { + 0 // will splitted based on block by default. + } else { + math.max(nativeHadoopConf.getInt("mapreduce.job.maps", 1), + SparkSession.getActiveSession.get.sparkContext.defaultMinPartitions) + } + + private val ignoreEmptySplits = + SparkSession.getActiveSession.get.sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS) + + override val nodeName: String = + s"NativeHiveTableScan $tableName" + + override def doExecuteNative(): NativeRDD = { + val nativeMetrics = SparkMetricNode( + metrics, + Nil, + Some({ + case ("bytes_scanned", v) => + val inputMetric = TaskContext.get.taskMetrics().inputMetrics + inputMetric.incBytesRead(v) + case ("output_rows", v) => + val inputMetric = TaskContext.get.taskMetrics().inputMetrics + inputMetric.incRecordsRead(v) + case _ => + })) + val nativeFileSchema = this.nativeFileSchema + val nativeFileGroups = this.nativeFileGroups + val nativePartitionSchema = this.nativePartitionSchema + + val projection = schema.map(field => relation.schema.fieldIndex(field.name)) + val broadcastedHadoopConf = this.broadcastedHadoopConf + val numPartitions = partitions.length + + new NativeRDD( + sparkContext, + nativeMetrics, + partitions.asInstanceOf[Array[Partition]], + None, + Nil, + rddShuffleReadFull = true, + (partition, _) => { + val resourceId = s"NativeHiveTableScan:${UUID.randomUUID().toString}" + putJniBridgeResource(resourceId, broadcastedHadoopConf) + + val nativeFileGroup = nativeFileGroups(partition.asInstanceOf[FilePartition]) + val nativeFileScanConf = pb.FileScanExecConf + .newBuilder() + .setNumPartitions(numPartitions) + .setPartitionIndex(partition.index) + .setStatistics(pb.Statistics.getDefaultInstance) + .setSchema(nativeFileSchema) + .setFileGroup(nativeFileGroup) + .addAllProjection(projection.map(Integer.valueOf).asJava) + .setPartitionSchema(nativePartitionSchema) + .build() + fileFormat match { + case "orc" => + val nativeOrcScanExecBuilder = pb.OrcScanExecNode + .newBuilder() + .setBaseConf(nativeFileScanConf) + .setFsResourceId(resourceId) + .addAllPruningPredicates(new java.util.ArrayList()) // not support this filter + pb.PhysicalPlanNode + .newBuilder() + .setOrcScan(nativeOrcScanExecBuilder.build()) + .build() + case "parquet" => + val nativeParquetScanExecBuilder = pb.ParquetScanExecNode + .newBuilder() + .setBaseConf(nativeFileScanConf) + .setFsResourceId(resourceId) + .addAllPruningPredicates(new java.util.ArrayList()) // not support this filter + + pb.PhysicalPlanNode + .newBuilder() + .setParquetScan(nativeParquetScanExecBuilder.build()) + .build() + } + }, + friendlyName = "NativeRDD.HiveTableScan") + } + + override def getFilePartitions(): Array[FilePartition] = { + val newJobConf = new JobConf(nativeHadoopConf) + val arrayFilePartition = ArrayBuffer[FilePartition]() + val partitionedFiles = if (relation.isPartitioned) { + val partitions = basedHiveScan.prunedPartitions + val arrayPartitionedFile = ArrayBuffer[PartitionedFile]() + partitions.foreach { partition => + val partDesc = Utilities.getPartitionDescFromTableDesc(nativeTableDesc, partition, true) + val partPath = partition.getDataLocation + HadoopTableReader.initializeLocalJobConfFunc(partPath.toString, nativeTableDesc)(newJobConf) + val partitionValues = partition.getTPartition.getValues + + val partitionInternalRow = new GenericInternalRow(partitionValues.size()) + for (partitionIndex <- 0 until partitionValues.size) { + partitionInternalRow.update(partitionIndex, partitionValues.get(partitionIndex)) + } + + val inputFormatClass = partDesc.getInputFileFormatClass + .asInstanceOf[Class[newInputClass[Writable, Writable]]] + arrayPartitionedFile += getArrayPartitionedFile(newJobConf, inputFormatClass, partitionInternalRow) + } + arrayPartitionedFile + .sortBy(_.length)(implicitly[Ordering[Long]].reverse) + .toArray + } else { + val inputFormatClass = nativeTable.getInputFormatClass.asInstanceOf[Class[newInputClass[Writable, Writable]]] + getArrayPartitionedFile(newJobConf, inputFormatClass, new GenericInternalRow(0)) + .sortBy(_.length)(implicitly[Ordering[Long]].reverse) + .toArray + } + arrayFilePartition += FilePartition.getFilePartitions(SparkSession.getActiveSession.get, + partitionedFiles, + getMaxSplitBytes(SparkSession.getActiveSession.get)).toArray + arrayFilePartition.toArray + } + + private def getMaxSplitBytes(sparkSession: SparkSession): Long = { + val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes + val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes + Math.min(defaultMaxSplitBytes, openCostInBytes) + } + + private def getArrayPartitionedFile(newJobConf: JobConf, + inputFormatClass: Class[newInputClass[Writable, Writable]], + partitionInternalRow: GenericInternalRow): ArrayBuffer[PartitionedFile] = { + val allInputSplits = getInputFormat(newJobConf, inputFormatClass).getSplits(newJobConf, minPartitions) Review Comment: `getArrayPartitionedFile` declares `inputFormatClass` as a Hadoop `mapreduce.InputFormat` (`newInputClass`), but the implementation instantiates/casts it to the old `mapred.InputFormat` and uses `JobConf`/`getSplits(JobConf, Int)`. This API mismatch is unsafe (compile-time and runtime). Use a consistent InputFormat API (likely `org.apache.hadoop.mapred.InputFormat` given `JobConf`/`FileSplit` usage). ########## spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/HiveConvertProvider.scala: ########## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution.auron.plan + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.auron.AuronConverters.getBooleanConf +import org.apache.spark.sql.auron.{AuronConvertProvider, AuronConverters} +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.hive.execution.HiveTableScanExec + +class HiveConvertProvider extends AuronConvertProvider with Logging { + override def isEnabled: Boolean = + getBooleanConf("spark.auron.enable.hiveTable", defaultValue = true) + + def enableHiveTableScanExec: Boolean = + getBooleanConf("spark.auron.enable.hiveTableScanExec", defaultValue = false) + Review Comment: `HiveConvertProvider` is discovered via `ServiceLoader` (`AuronConverters` loads `AuronConvertProvider` implementations). This module currently doesn’t include a `META-INF/services/org.apache.spark.sql.auron.AuronConvertProvider` entry, so the provider won’t be loaded at runtime (see `thirdparty/auron-paimon` for the existing pattern). ########## spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/HiveConvertProvider.scala: ########## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution.auron.plan + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.auron.AuronConverters.getBooleanConf +import org.apache.spark.sql.auron.{AuronConvertProvider, AuronConverters} +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.hive.execution.HiveTableScanExec + +class HiveConvertProvider extends AuronConvertProvider with Logging { + override def isEnabled: Boolean = + getBooleanConf("spark.auron.enable.hiveTable", defaultValue = true) + + def enableHiveTableScanExec: Boolean = + getBooleanConf("spark.auron.enable.hiveTableScanExec", defaultValue = false) + + override def isSupported(exec: SparkPlan): Boolean = + exec match { + case e: HiveTableScanExec if enableHiveTableScanExec && + e.relation.tableMeta.provider.isDefined && + e.relation.tableMeta.provider.get.equals("hive") => + true + case _ => false + } + + override def convert(exec: SparkPlan): SparkPlan = { + exec match { + case hiveExec: HiveTableScanExec if enableHiveTableScanExec => + convertHiveTableScanExec(hiveExec) + case _ => exec + } + } + + def convertHiveTableScanExec(hiveExec: HiveTableScanExec): SparkPlan = { + AuronConverters.addRenameColumnsExec(NativeHiveTableScanExec(hiveExec)) + } Review Comment: There are existing query/operator validation test utilities in this module, but this new Hive scan conversion path doesn’t appear to have coverage. Add at least one suite that enables `spark.auron.enable.hiveTableScanExec` and verifies `HiveTableScanExec` is converted (and that unsupported formats don’t break execution). ########## spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHIveTableScanExec.scala: ########## @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive.execution.auron.plan + +import org.apache.auron.metric.SparkMetricNode +import org.apache.auron.{protobuf => pb} +import org.apache.hadoop.conf.Configurable +import org.apache.hadoop.hive.ql.exec.Utilities +import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat +import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat +import org.apache.hadoop.hive.ql.metadata.{Table => HiveTable} +import org.apache.hadoop.hive.ql.plan.TableDesc +import org.apache.hadoop.hive.serde.serdeConstants +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption +import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, StructObjectInspector} +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils +import org.apache.hadoop.io.Writable +import org.apache.hadoop.mapred.{FileSplit, InputFormat, JobConf} +import org.apache.hadoop.mapreduce.{InputFormat => newInputClass} +import org.apache.hadoop.util.ReflectionUtils +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.HADOOP_RDD_IGNORE_EMPTY_SPLITS +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.auron.{NativeRDD, Shims} +import org.apache.spark.sql.catalyst.expressions.{AttributeMap, GenericInternalRow} +import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile} +import org.apache.spark.sql.hive.client.HiveClientImpl +import org.apache.spark.sql.hive.execution.HiveTableScanExec +import org.apache.spark.sql.hive.{HadoopTableReader, HiveShim} +import org.apache.spark.{Partition, TaskContext} + +import java.util.UUID +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +case class NativeHiveTableScanExec(basedHiveScan: HiveTableScanExec) + extends NativeHiveTableScanBase(basedHiveScan) + with Logging { + + @transient private lazy val nativeTable: HiveTable = HiveClientImpl.toHiveTable(relation.tableMeta) + @transient private lazy val fileFormat = HiveTableUtil.getFileFormat(nativeTable.getInputFormatClass) + @transient private lazy val nativeTableDesc = new TableDesc( + nativeTable.getInputFormatClass, + nativeTable.getOutputFormatClass, + nativeTable.getMetadata) + + @transient private lazy val nativeHadoopConf = { + val hiveConf = SparkSession.getActiveSession.get.sessionState.newHadoopConf() + // append columns ids and names before broadcast + val columnOrdinals = AttributeMap(relation.dataCols.zipWithIndex) + val neededColumnIDs = output.flatMap(columnOrdinals.get).map(o => o: Integer) + val neededColumnNames = output.filter(columnOrdinals.contains).map(_.name) + + HiveShim.appendReadColumns(hiveConf, neededColumnIDs, neededColumnNames) + + val deserializer = nativeTableDesc.getDeserializerClass.getConstructor().newInstance() + deserializer.initialize(hiveConf, nativeTableDesc.getProperties) + + // Specifies types and object inspectors of columns to be scanned. + val structOI = ObjectInspectorUtils + .getStandardObjectInspector( + deserializer.getObjectInspector, + ObjectInspectorCopyOption.JAVA) + .asInstanceOf[StructObjectInspector] + + val columnTypeNames = structOI + .getAllStructFieldRefs.asScala + .map(_.getFieldObjectInspector) + .map(TypeInfoUtils.getTypeInfoFromObjectInspector(_).getTypeName) + .mkString(",") + + hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames) + hiveConf.set(serdeConstants.LIST_COLUMNS, relation.dataCols.map(_.name).mkString(",")) + hiveConf + } + + private val minPartitions = if (SparkSession.getActiveSession.get.sparkContext.isLocal) { + 0 // will splitted based on block by default. + } else { + math.max(nativeHadoopConf.getInt("mapreduce.job.maps", 1), + SparkSession.getActiveSession.get.sparkContext.defaultMinPartitions) + } + + private val ignoreEmptySplits = + SparkSession.getActiveSession.get.sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS) + + override val nodeName: String = + s"NativeHiveTableScan $tableName" + + override def doExecuteNative(): NativeRDD = { + val nativeMetrics = SparkMetricNode( + metrics, + Nil, + Some({ + case ("bytes_scanned", v) => + val inputMetric = TaskContext.get.taskMetrics().inputMetrics + inputMetric.incBytesRead(v) + case ("output_rows", v) => + val inputMetric = TaskContext.get.taskMetrics().inputMetrics + inputMetric.incRecordsRead(v) + case _ => + })) + val nativeFileSchema = this.nativeFileSchema + val nativeFileGroups = this.nativeFileGroups + val nativePartitionSchema = this.nativePartitionSchema + + val projection = schema.map(field => relation.schema.fieldIndex(field.name)) + val broadcastedHadoopConf = this.broadcastedHadoopConf + val numPartitions = partitions.length + + new NativeRDD( + sparkContext, + nativeMetrics, + partitions.asInstanceOf[Array[Partition]], + None, + Nil, + rddShuffleReadFull = true, + (partition, _) => { + val resourceId = s"NativeHiveTableScan:${UUID.randomUUID().toString}" + putJniBridgeResource(resourceId, broadcastedHadoopConf) + + val nativeFileGroup = nativeFileGroups(partition.asInstanceOf[FilePartition]) + val nativeFileScanConf = pb.FileScanExecConf + .newBuilder() + .setNumPartitions(numPartitions) + .setPartitionIndex(partition.index) + .setStatistics(pb.Statistics.getDefaultInstance) + .setSchema(nativeFileSchema) + .setFileGroup(nativeFileGroup) + .addAllProjection(projection.map(Integer.valueOf).asJava) + .setPartitionSchema(nativePartitionSchema) + .build() + fileFormat match { + case "orc" => + val nativeOrcScanExecBuilder = pb.OrcScanExecNode + .newBuilder() + .setBaseConf(nativeFileScanConf) + .setFsResourceId(resourceId) + .addAllPruningPredicates(new java.util.ArrayList()) // not support this filter + pb.PhysicalPlanNode + .newBuilder() + .setOrcScan(nativeOrcScanExecBuilder.build()) + .build() + case "parquet" => + val nativeParquetScanExecBuilder = pb.ParquetScanExecNode + .newBuilder() + .setBaseConf(nativeFileScanConf) + .setFsResourceId(resourceId) + .addAllPruningPredicates(new java.util.ArrayList()) // not support this filter + + pb.PhysicalPlanNode + .newBuilder() + .setParquetScan(nativeParquetScanExecBuilder.build()) + .build() + } + }, + friendlyName = "NativeRDD.HiveTableScan") + } + + override def getFilePartitions(): Array[FilePartition] = { + val newJobConf = new JobConf(nativeHadoopConf) + val arrayFilePartition = ArrayBuffer[FilePartition]() + val partitionedFiles = if (relation.isPartitioned) { + val partitions = basedHiveScan.prunedPartitions + val arrayPartitionedFile = ArrayBuffer[PartitionedFile]() + partitions.foreach { partition => + val partDesc = Utilities.getPartitionDescFromTableDesc(nativeTableDesc, partition, true) + val partPath = partition.getDataLocation + HadoopTableReader.initializeLocalJobConfFunc(partPath.toString, nativeTableDesc)(newJobConf) + val partitionValues = partition.getTPartition.getValues + + val partitionInternalRow = new GenericInternalRow(partitionValues.size()) + for (partitionIndex <- 0 until partitionValues.size) { + partitionInternalRow.update(partitionIndex, partitionValues.get(partitionIndex)) + } + + val inputFormatClass = partDesc.getInputFileFormatClass + .asInstanceOf[Class[newInputClass[Writable, Writable]]] + arrayPartitionedFile += getArrayPartitionedFile(newJobConf, inputFormatClass, partitionInternalRow) + } + arrayPartitionedFile + .sortBy(_.length)(implicitly[Ordering[Long]].reverse) + .toArray + } else { + val inputFormatClass = nativeTable.getInputFormatClass.asInstanceOf[Class[newInputClass[Writable, Writable]]] + getArrayPartitionedFile(newJobConf, inputFormatClass, new GenericInternalRow(0)) + .sortBy(_.length)(implicitly[Ordering[Long]].reverse) + .toArray + } + arrayFilePartition += FilePartition.getFilePartitions(SparkSession.getActiveSession.get, + partitionedFiles, + getMaxSplitBytes(SparkSession.getActiveSession.get)).toArray + arrayFilePartition.toArray + } + + private def getMaxSplitBytes(sparkSession: SparkSession): Long = { + val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes + val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes + Math.min(defaultMaxSplitBytes, openCostInBytes) + } + + private def getArrayPartitionedFile(newJobConf: JobConf, + inputFormatClass: Class[newInputClass[Writable, Writable]], + partitionInternalRow: GenericInternalRow): ArrayBuffer[PartitionedFile] = { + val allInputSplits = getInputFormat(newJobConf, inputFormatClass).getSplits(newJobConf, minPartitions) + val inputSplits = if (ignoreEmptySplits) { + allInputSplits.filter(_.getLength > 0) + } else { + allInputSplits + } + inputFormatClass match { + case OrcInputFormat => + case MapredParquetInputFormat => + case _ => + } Review Comment: `inputFormatClass match { case OrcInputFormat => ... }` is matching a `Class[_]` value against a class name, and the cases are empty. If you need special handling by input format, compare against `classOf[OrcInputFormat]` / `classOf[MapredParquetInputFormat]` and implement the intended behavior; otherwise remove this dead code block. ```suggestion ``` ########## spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHIveTableScanExec.scala: ########## @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive.execution.auron.plan + +import org.apache.auron.metric.SparkMetricNode +import org.apache.auron.{protobuf => pb} +import org.apache.hadoop.conf.Configurable +import org.apache.hadoop.hive.ql.exec.Utilities +import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat +import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat +import org.apache.hadoop.hive.ql.metadata.{Table => HiveTable} +import org.apache.hadoop.hive.ql.plan.TableDesc +import org.apache.hadoop.hive.serde.serdeConstants +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption +import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, StructObjectInspector} +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils +import org.apache.hadoop.io.Writable +import org.apache.hadoop.mapred.{FileSplit, InputFormat, JobConf} +import org.apache.hadoop.mapreduce.{InputFormat => newInputClass} +import org.apache.hadoop.util.ReflectionUtils +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.HADOOP_RDD_IGNORE_EMPTY_SPLITS +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.auron.{NativeRDD, Shims} +import org.apache.spark.sql.catalyst.expressions.{AttributeMap, GenericInternalRow} +import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile} +import org.apache.spark.sql.hive.client.HiveClientImpl +import org.apache.spark.sql.hive.execution.HiveTableScanExec +import org.apache.spark.sql.hive.{HadoopTableReader, HiveShim} +import org.apache.spark.{Partition, TaskContext} + +import java.util.UUID +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +case class NativeHiveTableScanExec(basedHiveScan: HiveTableScanExec) + extends NativeHiveTableScanBase(basedHiveScan) + with Logging { + + @transient private lazy val nativeTable: HiveTable = HiveClientImpl.toHiveTable(relation.tableMeta) + @transient private lazy val fileFormat = HiveTableUtil.getFileFormat(nativeTable.getInputFormatClass) + @transient private lazy val nativeTableDesc = new TableDesc( + nativeTable.getInputFormatClass, + nativeTable.getOutputFormatClass, + nativeTable.getMetadata) + + @transient private lazy val nativeHadoopConf = { + val hiveConf = SparkSession.getActiveSession.get.sessionState.newHadoopConf() + // append columns ids and names before broadcast + val columnOrdinals = AttributeMap(relation.dataCols.zipWithIndex) + val neededColumnIDs = output.flatMap(columnOrdinals.get).map(o => o: Integer) + val neededColumnNames = output.filter(columnOrdinals.contains).map(_.name) + + HiveShim.appendReadColumns(hiveConf, neededColumnIDs, neededColumnNames) + + val deserializer = nativeTableDesc.getDeserializerClass.getConstructor().newInstance() + deserializer.initialize(hiveConf, nativeTableDesc.getProperties) + + // Specifies types and object inspectors of columns to be scanned. + val structOI = ObjectInspectorUtils + .getStandardObjectInspector( + deserializer.getObjectInspector, + ObjectInspectorCopyOption.JAVA) + .asInstanceOf[StructObjectInspector] + + val columnTypeNames = structOI + .getAllStructFieldRefs.asScala + .map(_.getFieldObjectInspector) + .map(TypeInfoUtils.getTypeInfoFromObjectInspector(_).getTypeName) + .mkString(",") + + hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames) + hiveConf.set(serdeConstants.LIST_COLUMNS, relation.dataCols.map(_.name).mkString(",")) + hiveConf + } + + private val minPartitions = if (SparkSession.getActiveSession.get.sparkContext.isLocal) { + 0 // will splitted based on block by default. + } else { + math.max(nativeHadoopConf.getInt("mapreduce.job.maps", 1), + SparkSession.getActiveSession.get.sparkContext.defaultMinPartitions) + } + + private val ignoreEmptySplits = + SparkSession.getActiveSession.get.sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS) + + override val nodeName: String = + s"NativeHiveTableScan $tableName" + + override def doExecuteNative(): NativeRDD = { + val nativeMetrics = SparkMetricNode( + metrics, + Nil, + Some({ + case ("bytes_scanned", v) => + val inputMetric = TaskContext.get.taskMetrics().inputMetrics + inputMetric.incBytesRead(v) + case ("output_rows", v) => + val inputMetric = TaskContext.get.taskMetrics().inputMetrics + inputMetric.incRecordsRead(v) + case _ => + })) + val nativeFileSchema = this.nativeFileSchema + val nativeFileGroups = this.nativeFileGroups + val nativePartitionSchema = this.nativePartitionSchema + + val projection = schema.map(field => relation.schema.fieldIndex(field.name)) + val broadcastedHadoopConf = this.broadcastedHadoopConf + val numPartitions = partitions.length + + new NativeRDD( + sparkContext, + nativeMetrics, + partitions.asInstanceOf[Array[Partition]], + None, + Nil, + rddShuffleReadFull = true, + (partition, _) => { + val resourceId = s"NativeHiveTableScan:${UUID.randomUUID().toString}" + putJniBridgeResource(resourceId, broadcastedHadoopConf) + + val nativeFileGroup = nativeFileGroups(partition.asInstanceOf[FilePartition]) + val nativeFileScanConf = pb.FileScanExecConf + .newBuilder() + .setNumPartitions(numPartitions) + .setPartitionIndex(partition.index) + .setStatistics(pb.Statistics.getDefaultInstance) + .setSchema(nativeFileSchema) + .setFileGroup(nativeFileGroup) + .addAllProjection(projection.map(Integer.valueOf).asJava) + .setPartitionSchema(nativePartitionSchema) + .build() + fileFormat match { + case "orc" => + val nativeOrcScanExecBuilder = pb.OrcScanExecNode + .newBuilder() + .setBaseConf(nativeFileScanConf) + .setFsResourceId(resourceId) + .addAllPruningPredicates(new java.util.ArrayList()) // not support this filter + pb.PhysicalPlanNode + .newBuilder() + .setOrcScan(nativeOrcScanExecBuilder.build()) + .build() + case "parquet" => + val nativeParquetScanExecBuilder = pb.ParquetScanExecNode + .newBuilder() + .setBaseConf(nativeFileScanConf) + .setFsResourceId(resourceId) + .addAllPruningPredicates(new java.util.ArrayList()) // not support this filter + + pb.PhysicalPlanNode + .newBuilder() + .setParquetScan(nativeParquetScanExecBuilder.build()) + .build() + } + }, + friendlyName = "NativeRDD.HiveTableScan") + } + + override def getFilePartitions(): Array[FilePartition] = { + val newJobConf = new JobConf(nativeHadoopConf) + val arrayFilePartition = ArrayBuffer[FilePartition]() + val partitionedFiles = if (relation.isPartitioned) { + val partitions = basedHiveScan.prunedPartitions + val arrayPartitionedFile = ArrayBuffer[PartitionedFile]() + partitions.foreach { partition => + val partDesc = Utilities.getPartitionDescFromTableDesc(nativeTableDesc, partition, true) + val partPath = partition.getDataLocation + HadoopTableReader.initializeLocalJobConfFunc(partPath.toString, nativeTableDesc)(newJobConf) + val partitionValues = partition.getTPartition.getValues + + val partitionInternalRow = new GenericInternalRow(partitionValues.size()) + for (partitionIndex <- 0 until partitionValues.size) { + partitionInternalRow.update(partitionIndex, partitionValues.get(partitionIndex)) + } + + val inputFormatClass = partDesc.getInputFileFormatClass + .asInstanceOf[Class[newInputClass[Writable, Writable]]] + arrayPartitionedFile += getArrayPartitionedFile(newJobConf, inputFormatClass, partitionInternalRow) + } + arrayPartitionedFile + .sortBy(_.length)(implicitly[Ordering[Long]].reverse) + .toArray + } else { + val inputFormatClass = nativeTable.getInputFormatClass.asInstanceOf[Class[newInputClass[Writable, Writable]]] + getArrayPartitionedFile(newJobConf, inputFormatClass, new GenericInternalRow(0)) + .sortBy(_.length)(implicitly[Ordering[Long]].reverse) + .toArray + } + arrayFilePartition += FilePartition.getFilePartitions(SparkSession.getActiveSession.get, + partitionedFiles, + getMaxSplitBytes(SparkSession.getActiveSession.get)).toArray Review Comment: `FilePartition.getFilePartitions(SparkSession.getActiveSession.get, ...)` again relies on `getActiveSession.get`. Use the `sparkSession` derived from `basedHiveScan` (as in `NativeHiveTableScanBase`) so partition planning doesn’t fail when there’s no active session. ```suggestion val sparkSession = basedHiveScan.sparkSession arrayFilePartition += FilePartition.getFilePartitions( sparkSession, partitionedFiles, getMaxSplitBytes(sparkSession) ).toArray ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
