This is an automated email from the ASF dual-hosted git repository.
rui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 6548ab4941 [VL] Support mapping columns by position index for ORC and
Parquet files (#10697)
6548ab4941 is described below
commit 6548ab4941fc92d10f30725937ca4968f3c3bb24
Author: kevinwilfong <[email protected]>
AuthorDate: Wed Oct 15 07:31:45 2025 -0700
[VL] Support mapping columns by position index for ORC and Parquet files
(#10697)
---
.../gluten/backendsapi/clickhouse/CHBackend.scala | 1 +
.../backendsapi/clickhouse/CHIteratorApi.scala | 1 +
.../gluten/backendsapi/velox/VeloxBackend.scala | 37 ++++++--
.../backendsapi/velox/VeloxIteratorApi.scala | 80 ++++++++++------
.../backendsapi/velox/VeloxValidatorApi.scala | 45 +++++----
.../org/apache/gluten/config/VeloxConfig.scala | 16 ++++
.../apache/gluten/execution/FallbackSuite.scala | 36 ++++++-
.../apache/gluten/execution/VeloxScanSuite.scala | 103 ++++++++++++++++++++-
cpp/velox/compute/VeloxPlanConverter.cc | 33 ++++++-
cpp/velox/compute/VeloxPlanConverter.h | 1 +
cpp/velox/compute/WholeStageResultIterator.cc | 4 +
cpp/velox/config/VeloxConfig.h | 2 +
cpp/velox/substrait/SubstraitToVeloxPlan.cc | 11 ++-
cpp/velox/substrait/SubstraitToVeloxPlan.h | 3 +
cpp/velox/utils/ConfigExtractor.cc | 4 -
docs/velox-configuration.md | 4 +-
.../gluten/substrait/rel/LocalFilesNode.java | 22 +++--
.../gluten/backendsapi/BackendSettingsApi.scala | 5 +-
.../apache/gluten/backendsapi/IteratorApi.scala | 2 +
.../execution/BasicScanExecTransformer.scala | 17 +---
.../execution/BatchScanExecTransformer.scala | 1 +
21 files changed, 336 insertions(+), 92 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
index 141cdaa24c..0573d0cf38 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
@@ -181,6 +181,7 @@ object CHBackendSettings extends BackendSettingsApi with
Logging {
override def validateScanExec(
format: ReadFileFormat,
fields: Array[StructField],
+ dataSchema: StructType,
rootPaths: Seq[String],
properties: Map[String, String],
hadoopConf: Configuration): ValidationResult = {
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
index 8d22bd2b1c..07c3d14409 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
@@ -127,6 +127,7 @@ class CHIteratorApi extends IteratorApi with Logging with
LogLevelUtil {
override def genSplitInfo(
partition: InputPartition,
partitionSchema: StructType,
+ dataSchema: StructType,
fileFormat: ReadFileFormat,
metadataColumnNames: Seq[String],
properties: Map[String, String]): SplitInfo = {
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 3967ab68d4..3da306e1e9 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -99,6 +99,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
override def validateScanExec(
format: ReadFileFormat,
fields: Array[StructField],
+ dataSchema: StructType,
rootPaths: Seq[String],
properties: Map[String, String],
hadoopConf: Configuration): ValidationResult = {
@@ -117,9 +118,11 @@ object VeloxBackendSettings extends BackendSettingsApi {
}
def validateFormat(): Option[String] = {
- def validateTypes(validatorFunc: PartialFunction[StructField, String]):
Option[String] = {
+ def validateTypes(
+ validatorFunc: PartialFunction[StructField, String],
+ fieldsToValidate: Array[StructField]): Option[String] = {
// Collect unsupported types.
- val unsupportedDataTypeReason = fields.collect(validatorFunc)
+ val unsupportedDataTypeReason = fieldsToValidate.collect(validatorFunc)
if (unsupportedDataTypeReason.nonEmpty) {
Some(
s"Found unsupported data type in $format:
${unsupportedDataTypeReason.mkString(", ")}.")
@@ -152,7 +155,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
if (!VeloxConfig.get.veloxOrcScanEnabled) {
Some(s"Velox ORC scan is turned off,
${VeloxConfig.VELOX_ORC_SCAN_ENABLED.key}")
} else {
- val typeValidator: PartialFunction[StructField, String] = {
+ val fieldTypeValidator: PartialFunction[StructField, String] = {
case StructField(_, arrayType: ArrayType, _, _)
if arrayType.elementType.isInstanceOf[StructType] =>
"StructType as element in ArrayType"
@@ -165,12 +168,16 @@ object VeloxBackendSettings extends BackendSettingsApi {
case StructField(_, mapType: MapType, _, _)
if mapType.valueType.isInstanceOf[ArrayType] =>
"ArrayType as Value in MapType"
+ case StructField(_, TimestampType, _, _) => "TimestampType"
+ }
+
+ val schemaTypeValidator: PartialFunction[StructField, String] = {
case StructField(_, stringType: StringType, _, metadata)
if isCharType(stringType, metadata) =>
CharVarcharUtils.getRawTypeString(metadata) + "(force
fallback)"
- case StructField(_, TimestampType, _, _) => "TimestampType"
}
- validateTypes(typeValidator)
+ validateTypes(fieldTypeValidator, fields)
+ .orElse(validateTypes(schemaTypeValidator, dataSchema.fields))
}
case _ => Some(s"Unsupported file format $format.")
}
@@ -193,10 +200,28 @@ object VeloxBackendSettings extends BackendSettingsApi {
}
}
+ def validateDataSchema(): Option[String] = {
+ if (VeloxConfig.get.parquetUseColumnNames &&
VeloxConfig.get.orcUseColumnNames) {
+ return None
+ }
+
+ // If we are using column indices for schema evolution, we need to pass
the table schema to
+ // Velox. We need to ensure all types in the table schema are supported.
+ val validationResults =
+ dataSchema.fields.flatMap(field =>
VeloxValidatorApi.validateSchema(field.dataType))
+ if (validationResults.nonEmpty) {
+ Some(s"""Found unsupported data type(s) in file
+ |schema: ${validationResults.mkString(", ")}.""".stripMargin)
+ } else {
+ None
+ }
+ }
+
val validationChecks = Seq(
validateScheme(),
validateFormat(),
- validateEncryption()
+ validateEncryption(),
+ validateDataSchema()
)
for (check <- validationChecks) {
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
index f603f454ac..4afad58c9a 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
@@ -18,12 +18,13 @@ package org.apache.gluten.backendsapi.velox
import org.apache.gluten.backendsapi.{BackendsApiManager, IteratorApi}
import org.apache.gluten.backendsapi.velox.VeloxIteratorApi.unescapePathName
+import org.apache.gluten.config.VeloxConfig
import org.apache.gluten.execution._
import org.apache.gluten.iterator.Iterators
import org.apache.gluten.metrics.{IMetrics, IteratorMetricsJniWrapper}
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.plan.PlanNode
-import org.apache.gluten.substrait.rel.{LocalFilesBuilder, SplitInfo}
+import org.apache.gluten.substrait.rel.{LocalFilesBuilder, LocalFilesNode,
SplitInfo}
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.gluten.vectorized._
@@ -49,9 +50,25 @@ import scala.collection.JavaConverters._
class VeloxIteratorApi extends IteratorApi with Logging {
+ private def setFileSchemaForLocalFiles(
+ localFilesNode: LocalFilesNode,
+ fileSchema: StructType,
+ fileFormat: ReadFileFormat): LocalFilesNode = {
+ if (
+ ((fileFormat == ReadFileFormat.OrcReadFormat || fileFormat ==
ReadFileFormat.DwrfReadFormat)
+ && !VeloxConfig.get.orcUseColumnNames)
+ || (fileFormat == ReadFileFormat.ParquetReadFormat &&
!VeloxConfig.get.parquetUseColumnNames)
+ ) {
+ localFilesNode.setFileSchema(fileSchema)
+ }
+
+ localFilesNode
+ }
+
override def genSplitInfo(
partition: InputPartition,
partitionSchema: StructType,
+ dataSchema: StructType,
fileFormat: ReadFileFormat,
metadataColumnNames: Seq[String],
properties: Map[String, String]): SplitInfo = {
@@ -69,19 +86,23 @@ class VeloxIteratorApi extends IteratorApi with Logging {
constructSplitInfo(partitionSchema, f.files, metadataColumnNames)
val preferredLocations =
SoftAffinity.getFilePartitionLocations(f)
- LocalFilesBuilder.makeLocalFiles(
- f.index,
- paths,
- starts,
- lengths,
- fileSizes,
- modificationTimes,
- partitionColumns,
- metadataColumns,
- fileFormat,
- preferredLocations.toList.asJava,
- mapAsJavaMap(properties),
- otherMetadataColumns
+ setFileSchemaForLocalFiles(
+ LocalFilesBuilder.makeLocalFiles(
+ f.index,
+ paths,
+ starts,
+ lengths,
+ fileSizes,
+ modificationTimes,
+ partitionColumns,
+ metadataColumns,
+ fileFormat,
+ preferredLocations.toList.asJava,
+ mapAsJavaMap(properties),
+ otherMetadataColumns
+ ),
+ dataSchema,
+ fileFormat
)
case _ =>
throw new UnsupportedOperationException(s"Unsupported input
partition.")
@@ -92,6 +113,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
partitionIndex: Int,
partitions: Seq[InputPartition],
partitionSchema: StructType,
+ dataSchema: StructType,
fileFormat: ReadFileFormat,
metadataColumnNames: Seq[String],
properties: Map[String, String]): SplitInfo = {
@@ -115,19 +137,23 @@ class VeloxIteratorApi extends IteratorApi with Logging {
metadataColumns,
otherMetadataColumns) =
constructSplitInfo(partitionSchema, partitionFiles, metadataColumnNames)
- LocalFilesBuilder.makeLocalFiles(
- partitionIndex,
- paths,
- starts,
- lengths,
- fileSizes,
- modificationTimes,
- partitionColumns,
- metadataColumns,
- fileFormat,
- locations.toList.asJava,
- mapAsJavaMap(properties),
- otherMetadataColumns
+ setFileSchemaForLocalFiles(
+ LocalFilesBuilder.makeLocalFiles(
+ partitionIndex,
+ paths,
+ starts,
+ lengths,
+ fileSizes,
+ modificationTimes,
+ partitionColumns,
+ metadataColumns,
+ fileFormat,
+ locations.toList.asJava,
+ mapAsJavaMap(properties),
+ otherMetadataColumns
+ ),
+ dataSchema,
+ fileFormat
)
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala
index 15c367628e..7b9cf91112 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala
@@ -31,6 +31,7 @@ import org.apache.spark.task.TaskResources
import scala.collection.JavaConverters._
class VeloxValidatorApi extends ValidatorApi {
+ import VeloxValidatorApi._
/** For velox backend, key validation is on native side. */
override def doExprValidate(substraitExprName: String, expr: Expression):
Boolean =
@@ -53,6 +54,27 @@ class VeloxValidatorApi extends ValidatorApi {
info.fallbackInfo.asScala.reduce[String] { case (l, r) => l + "\n |-
" + r }))
}
+ override def doSchemaValidate(schema: DataType): Option[String] = {
+ validateSchema(schema)
+ }
+
+ override def doColumnarShuffleExchangeExecValidate(
+ outputAttributes: Seq[Attribute],
+ outputPartitioning: Partitioning,
+ child: SparkPlan): Option[String] = {
+ if (outputAttributes.isEmpty) {
+ // See: https://github.com/apache/incubator-gluten/issues/7600.
+ return Some("Shuffle with empty output schema is not supported")
+ }
+ if (child.output.isEmpty) {
+ // See: https://github.com/apache/incubator-gluten/issues/7600.
+ return Some("Shuffle with empty input schema is not supported")
+ }
+ doSchemaValidate(child.schema)
+ }
+}
+
+object VeloxValidatorApi {
private def isPrimitiveType(dataType: DataType): Boolean = {
dataType match {
case BooleanType | ByteType | ShortType | IntegerType | LongType |
FloatType | DoubleType |
@@ -63,41 +85,26 @@ class VeloxValidatorApi extends ValidatorApi {
}
}
- override def doSchemaValidate(schema: DataType): Option[String] = {
+ def validateSchema(schema: DataType): Option[String] = {
if (isPrimitiveType(schema)) {
return None
}
schema match {
case map: MapType =>
- doSchemaValidate(map.keyType).orElse(doSchemaValidate(map.valueType))
+ validateSchema(map.keyType).orElse(validateSchema(map.valueType))
case struct: StructType =>
struct.foreach {
field =>
- val reason = doSchemaValidate(field.dataType)
+ val reason = validateSchema(field.dataType)
if (reason.isDefined) {
return reason
}
}
None
case array: ArrayType =>
- doSchemaValidate(array.elementType)
+ validateSchema(array.elementType)
case _ =>
Some(s"Schema / data type not supported: $schema")
}
}
-
- override def doColumnarShuffleExchangeExecValidate(
- outputAttributes: Seq[Attribute],
- outputPartitioning: Partitioning,
- child: SparkPlan): Option[String] = {
- if (outputAttributes.isEmpty) {
- // See: https://github.com/apache/incubator-gluten/issues/7600.
- return Some("Shuffle with empty output schema is not supported")
- }
- if (child.output.isEmpty) {
- // See: https://github.com/apache/incubator-gluten/issues/7600.
- return Some("Shuffle with empty input schema is not supported")
- }
- doSchemaValidate(child.schema)
- }
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
index 1be6749e08..8aa2903540 100644
--- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
+++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
@@ -77,6 +77,10 @@ class VeloxConfig(conf: SQLConf) extends GlutenConfig(conf) {
def veloxPreferredBatchBytes: Long =
getConf(COLUMNAR_VELOX_PREFERRED_BATCH_BYTES)
def cudfEnableTableScan: Boolean = getConf(CUDF_ENABLE_TABLE_SCAN)
+
+ def orcUseColumnNames: Boolean = getConf(ORC_USE_COLUMN_NAMES)
+
+ def parquetUseColumnNames: Boolean = getConf(PARQUET_USE_COLUMN_NAMES)
}
object VeloxConfig extends ConfigRegistry {
@@ -665,4 +669,16 @@ object VeloxConfig extends ConfigRegistry {
"instance per thread of execution.")
.intConf
.createWithDefault(100)
+
+ val ORC_USE_COLUMN_NAMES =
+ buildConf("spark.gluten.sql.columnar.backend.velox.orcUseColumnNames")
+ .doc("Maps table field names to file field names using names, not
indices for ORC files.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val PARQUET_USE_COLUMN_NAMES =
+ buildConf("spark.gluten.sql.columnar.backend.velox.parquetUseColumnNames")
+ .doc("Maps table field names to file field names using names, not
indices for Parquet files.")
+ .booleanConf
+ .createWithDefault(true)
}
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala
index 83958537f1..c357e488b6 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala
@@ -16,7 +16,7 @@
*/
package org.apache.gluten.execution
-import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.config.{GlutenConfig, VeloxConfig}
import org.apache.spark.SparkConf
import org.apache.spark.sql.execution.{ColumnarBroadcastExchangeExec,
ColumnarShuffleExchangeExec, SparkPlan}
@@ -270,4 +270,38 @@ class FallbackSuite extends
VeloxWholeStageTransformerSuite with AdaptiveSparkPl
}
}
}
+
+ testWithMinSparkVersion("fallback with index based schema evolution", "3.3")
{
+ val query = "SELECT c2 FROM test"
+ Seq("parquet", "orc").foreach {
+ format =>
+ Seq("true", "false").foreach {
+ parquetUseColumnNames =>
+ Seq("true", "false").foreach {
+ orcUseColumnNames =>
+ withSQLConf(
+ VeloxConfig.PARQUET_USE_COLUMN_NAMES.key ->
parquetUseColumnNames,
+ VeloxConfig.ORC_USE_COLUMN_NAMES.key -> orcUseColumnNames
+ ) {
+ withTable("test") {
+ spark
+ .range(100)
+ .selectExpr("to_timestamp_ntz(from_unixtime(id % 3)) as
c1", "id as c2")
+ .write
+ .format(format)
+ .saveAsTable("test")
+
+ runQueryAndCompare(query) {
+ df =>
+ val plan = df.queryExecution.executedPlan
+ val fallback = parquetUseColumnNames == "false" ||
+ orcUseColumnNames == "false"
+ assert(collect(plan) { case g: GlutenPlan => g
}.isEmpty == fallback)
+ }
+ }
+ }
+ }
+ }
+ }
+ }
}
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxScanSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxScanSuite.scala
index 5caf3df832..050fdaf7bf 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxScanSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxScanSuite.scala
@@ -18,7 +18,7 @@ package org.apache.gluten.execution
import org.apache.gluten.backendsapi.velox.VeloxBackendSettings
import org.apache.gluten.benchmarks.RandomParquetDataGenerator
-import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.config.{GlutenConfig, VeloxConfig}
import org.apache.gluten.utils.VeloxFileSystemValidationJniWrapper
import org.apache.spark.SparkConf
@@ -28,6 +28,8 @@ import org.apache.spark.sql.execution.ScalarSubquery
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
+import scala.reflect.ClassTag
+
class VeloxScanSuite extends VeloxWholeStageTransformerSuite {
protected val rootPath: String = getClass.getResource("/").getPath
override protected val resourcePath: String = "/tpch-data-parquet"
@@ -44,6 +46,12 @@ class VeloxScanSuite extends VeloxWholeStageTransformerSuite
{
super.beforeAll()
}
+ def checkQuery[T <: GlutenPlan: ClassTag](query: String, expectedResults:
Seq[Row]): Unit = {
+ val df = sql(query)
+ checkAnswer(df, expectedResults)
+ checkGlutenOperatorMatch[T](df)
+ }
+
test("tpch q22 subquery filter pushdown - v1") {
createTPCHNotNullTables()
runTPCHQuery(22, tpchQueries, queriesResults, compareResult = false,
noFallBack = false) {
@@ -202,9 +210,98 @@ class VeloxScanSuite extends
VeloxWholeStageTransformerSuite {
withTable("test") {
sql("create table test (a long, b string) using parquet options
(path '" + path + "')")
- val df = sql("select b from test group by b order by b")
- checkAnswer(df, Seq(Row("10"), Row("11")))
+ checkQuery[FileSourceScanExecTransformer](
+ "select b from test group by b order by b",
+ Seq(Row("10"), Row("11")))
}
}
}
+
+ test("parquet index based schema evolution") {
+ withSQLConf(
+ VeloxConfig.PARQUET_USE_COLUMN_NAMES.key -> "false",
+ "spark.gluten.sql.complexType.scan.fallback.enabled" -> "false") {
+ withTempDir {
+ dir =>
+ val path = dir.getCanonicalPath
+ spark
+ .range(2)
+ .selectExpr("id as a", "cast(id + 10 as string) as b")
+ .write
+ .mode("overwrite")
+ .parquet(path)
+
+ withTable("test") {
+ sql(s"""create table test (c long, d string, e float) using
parquet options
+ |(path '$path')""".stripMargin)
+
+ checkQuery[FileSourceScanExecTransformer](
+ "select c, d from test",
+ Seq(Row(0L, "10"), Row(1L, "11")))
+
+ checkQuery[FileSourceScanExecTransformer](
+ "select d from test",
+ Seq(Row("10"), Row("11")))
+
+ checkQuery[FileSourceScanExecTransformer]("select c from test",
Seq(Row(0L), Row(1L)))
+
+ checkQuery[FileSourceScanExecTransformer](
+ "select d, c from test",
+ Seq(Row("10", 0L), Row("11", 1L)))
+
+ checkQuery[FileSourceScanExecTransformer](
+ "select c, d, e from test",
+ Seq(Row(0L, "10", null), Row(1L, "11", null)))
+
+ checkQuery[FileSourceScanExecTransformer](
+ "select e, d, c from test",
+ Seq(Row(null, "10", 0L), Row(null, "11", 1L)))
+ }
+ }
+ }
+ }
+
+ test("ORC index based schema evolution") {
+ withSQLConf(
+ VeloxConfig.ORC_USE_COLUMN_NAMES.key -> "false",
+ "spark.gluten.sql.complexType.scan.fallback.enabled" -> "false") {
+ withTempDir {
+ dir =>
+ val path = dir.getCanonicalPath
+ spark
+ .range(2)
+ .selectExpr("id as a", "cast(id + 10 as string) as b")
+ .write
+ .mode("overwrite")
+ .orc(path)
+
+ withTable("test") {
+ sql(s"""create table test (c long, d string, e float) using orc
options
+ |(path '$path')""".stripMargin)
+
+ checkQuery[FileSourceScanExecTransformer](
+ "select c, d from test",
+ Seq(Row(0L, "10"), Row(1L, "11")))
+
+ checkQuery[FileSourceScanExecTransformer](
+ "select d from test",
+ Seq(Row("10"), Row("11")))
+
+ checkQuery[FileSourceScanExecTransformer]("select c from test",
Seq(Row(0L), Row(1L)))
+
+ checkQuery[FileSourceScanExecTransformer](
+ "select d, c from test",
+ Seq(Row("10", 0L), Row("11", 1L)))
+
+ checkQuery[FileSourceScanExecTransformer](
+ "select c, d, e from test",
+ Seq(Row(0L, "10", null), Row(1L, "11", null)))
+
+ checkQuery[FileSourceScanExecTransformer](
+ "select e, d, c from test",
+ Seq(Row(null, "10", 0L), Row(null, "11", 1L)))
+ }
+ }
+ }
+ }
}
diff --git a/cpp/velox/compute/VeloxPlanConverter.cc
b/cpp/velox/compute/VeloxPlanConverter.cc
index 7b58584c34..88eceb3a74 100644
--- a/cpp/velox/compute/VeloxPlanConverter.cc
+++ b/cpp/velox/compute/VeloxPlanConverter.cc
@@ -36,12 +36,15 @@ VeloxPlanConverter::VeloxPlanConverter(
const std::optional<std::string> writeFileName,
bool validationMode)
: validationMode_(validationMode),
+ veloxCfg_(veloxCfg),
substraitVeloxPlanConverter_(veloxPool, veloxCfg, writeFilesTempPath,
writeFileName, validationMode) {
+ VELOX_USER_CHECK_NOT_NULL(veloxCfg_);
substraitVeloxPlanConverter_.setInputIters(std::move(inputIters));
}
namespace {
std::shared_ptr<SplitInfo> parseScanSplitInfo(
+ const facebook::velox::config::ConfigBase* veloxCfg,
const
google::protobuf::RepeatedPtrField<substrait::ReadRel_LocalFiles_FileOrFiles>&
fileList) {
using SubstraitFileFormatCase =
::substrait::ReadRel_LocalFiles_FileOrFiles::FileFormatCase;
@@ -98,18 +101,44 @@ std::shared_ptr<SplitInfo> parseScanSplitInfo(
splitInfo->format = dwio::common::FileFormat::UNKNOWN;
break;
}
+
+ // The schema in file represents the table schema, it is set when the
TableScan requires the
+ // table schema to be present, currently when the option is set to map
columns by index rather
+ // than by name in Parquet or ORC files. Since the table schema should be
the same for all
+ // files, we set it in the SplitInfo based on the first file we encounter
with the schema set.
+ if (!splitInfo->tableSchema && file.has_schema()) {
+ const auto& schema = file.schema();
+
+ std::vector<std::string> names;
+ std::vector<TypePtr> types;
+ names.reserve(schema.names().size());
+
+ const bool asLowerCase = !veloxCfg->get<bool>(kCaseSensitive, false);
+ for (const auto& name : schema.names()) {
+ std::string fieldName = name;
+ if (asLowerCase) {
+ folly::toLowerAscii(fieldName);
+ }
+ names.emplace_back(std::move(fieldName));
+ }
+ types = SubstraitParser::parseNamedStruct(schema, asLowerCase);
+
+ splitInfo->tableSchema = ROW(std::move(names), std::move(types));
+ }
}
return splitInfo;
}
void parseLocalFileNodes(
SubstraitToVeloxPlanConverter* planConverter,
+ const facebook::velox::config::ConfigBase* veloxCfg,
std::vector<::substrait::ReadRel_LocalFiles>& localFiles) {
std::vector<std::shared_ptr<SplitInfo>> splitInfos;
splitInfos.reserve(localFiles.size());
for (const auto& localFile : localFiles) {
const auto& fileList = localFile.items();
- splitInfos.push_back(parseScanSplitInfo(fileList));
+
+ splitInfos.push_back(parseScanSplitInfo(veloxCfg, fileList));
}
planConverter->setSplitInfos(std::move(splitInfos));
@@ -120,7 +149,7 @@ std::shared_ptr<const facebook::velox::core::PlanNode>
VeloxPlanConverter::toVel
const ::substrait::Plan& substraitPlan,
std::vector<::substrait::ReadRel_LocalFiles> localFiles) {
if (!validationMode_) {
- parseLocalFileNodes(&substraitVeloxPlanConverter_, localFiles);
+ parseLocalFileNodes(&substraitVeloxPlanConverter_, veloxCfg_, localFiles);
}
return substraitVeloxPlanConverter_.toVeloxPlan(substraitPlan);
diff --git a/cpp/velox/compute/VeloxPlanConverter.h
b/cpp/velox/compute/VeloxPlanConverter.h
index 7a14693cb7..4678dccea7 100644
--- a/cpp/velox/compute/VeloxPlanConverter.h
+++ b/cpp/velox/compute/VeloxPlanConverter.h
@@ -48,6 +48,7 @@ class VeloxPlanConverter {
private:
bool validationMode_;
+ const facebook::velox::config::ConfigBase* veloxCfg_;
SubstraitToVeloxPlanConverter substraitVeloxPlanConverter_;
};
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc
b/cpp/velox/compute/WholeStageResultIterator.cc
index 08047b34fb..4aacc43b7f 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -699,6 +699,10 @@ std::shared_ptr<velox::config::ConfigBase>
WholeStageResultIterator::createConne
std::to_string(veloxCfg_->get<int32_t>(kMaxPartitions, 10000));
configs[velox::connector::hive::HiveConfig::kIgnoreMissingFilesSession] =
std::to_string(veloxCfg_->get<bool>(kIgnoreMissingFiles, false));
+ configs[velox::connector::hive::HiveConfig::kParquetUseColumnNamesSession] =
+ std::to_string(veloxCfg_->get<bool>(kParquetUseColumnNames, true));
+ configs[velox::connector::hive::HiveConfig::kOrcUseColumnNamesSession] =
+ std::to_string(veloxCfg_->get<bool>(kOrcUseColumnNames, true));
return std::make_shared<velox::config::ConfigBase>(std::move(configs));
}
diff --git a/cpp/velox/config/VeloxConfig.h b/cpp/velox/config/VeloxConfig.h
index 3d3a7d36bf..2f3aad2187 100644
--- a/cpp/velox/config/VeloxConfig.h
+++ b/cpp/velox/config/VeloxConfig.h
@@ -147,6 +147,8 @@ const std::string kMaxCoalescedBytes =
"spark.gluten.sql.columnar.backend.velox.
const std::string kCachePrefetchMinPct =
"spark.gluten.sql.columnar.backend.velox.cachePrefetchMinPct";
const std::string kMemoryPoolCapacityTransferAcrossTasks =
"spark.gluten.sql.columnar.backend.velox.memoryPoolCapacityTransferAcrossTasks";
+const std::string kOrcUseColumnNames =
"spark.gluten.sql.columnar.backend.velox.orcUseColumnNames";
+const std::string kParquetUseColumnNames =
"spark.gluten.sql.columnar.backend.velox.parquetUseColumnNames";
// write fies
const std::string kMaxPartitions =
"spark.gluten.sql.columnar.backend.velox.maxPartitionsPerWritersSession";
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
index 7e4cd1f514..5b76c22d8c 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
@@ -1330,9 +1330,14 @@ core::PlanNodePtr
SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
bool filterPushdownEnabled = true;
auto names = colNameList;
auto types = veloxTypeList;
- auto dataColumns = ROW(std::move(names), std::move(types));
+
+ // The columns we project from the file.
+ auto baseSchema = ROW(std::move(names), std::move(types));
+ // The columns present in the table, if not available default to the
baseSchema.
+ auto tableSchema = splitInfo->tableSchema ? splitInfo->tableSchema :
baseSchema;
+
connector::ConnectorTableHandlePtr tableHandle;
- auto remainingFilter = readRel.has_filter() ?
exprConverter_->toVeloxExpr(readRel.filter(), dataColumns) : nullptr;
+ auto remainingFilter = readRel.has_filter() ?
exprConverter_->toVeloxExpr(readRel.filter(), baseSchema) : nullptr;
auto connectorId = kHiveConnectorId;
if (useCudfTableHandle(splitInfos_) &&
veloxCfg_->get<bool>(kCudfEnableTableScan, kCudfEnableTableScanDefault) &&
veloxCfg_->get<bool>(kCudfEnabled, kCudfEnabledDefault)) {
@@ -1342,7 +1347,7 @@ core::PlanNodePtr
SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
}
common::SubfieldFilters subfieldFilters;
tableHandle = std::make_shared<connector::hive::HiveTableHandle>(
- connectorId, "hive_table", filterPushdownEnabled,
std::move(subfieldFilters), remainingFilter, dataColumns);
+ connectorId, "hive_table", filterPushdownEnabled,
std::move(subfieldFilters), remainingFilter, tableSchema);
// Get assignments and out names.
std::vector<std::string> outNames;
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.h
b/cpp/velox/substrait/SubstraitToVeloxPlan.h
index b00c3447f7..e1c6cee63c 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.h
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.h
@@ -55,6 +55,9 @@ struct SplitInfo {
/// The file sizes and modification times of the files to be scanned.
std::vector<std::optional<facebook::velox::FileProperties>> properties;
+ /// The schema of the table being scanned.
+ RowTypePtr tableSchema;
+
/// Make SplitInfo polymorphic
virtual ~SplitInfo() = default;
diff --git a/cpp/velox/utils/ConfigExtractor.cc
b/cpp/velox/utils/ConfigExtractor.cc
index 96da5069a2..1d4cd7f859 100644
--- a/cpp/velox/utils/ConfigExtractor.cc
+++ b/cpp/velox/utils/ConfigExtractor.cc
@@ -277,10 +277,6 @@ std::shared_ptr<facebook::velox::config::ConfigBase>
getHiveConfig(
// read as UTC
hiveConfMap[facebook::velox::connector::hive::HiveConfig::kReadTimestampPartitionValueAsLocalTime]
= "false";
- // Maps table field names to file field names using names, not indices.
-
hiveConfMap[facebook::velox::connector::hive::HiveConfig::kParquetUseColumnNames]
= "true";
-
hiveConfMap[facebook::velox::connector::hive::HiveConfig::kOrcUseColumnNames] =
"true";
-
return
std::make_shared<facebook::velox::config::ConfigBase>(std::move(hiveConfMap));
}
diff --git a/docs/velox-configuration.md b/docs/velox-configuration.md
index 160f0c88ae..44996a5cd4 100644
--- a/docs/velox-configuration.md
+++ b/docs/velox-configuration.md
@@ -9,7 +9,7 @@ nav_order: 16
## Gluten Velox backend configurations
-| Key
| Default |
Description
[...]
+| Key
| Default | Description
[...]
|----------------------------------------------------------------------------------|-------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
| spark.gluten.sql.columnar.backend.velox.IOThreads
| <undefined> | The Size of the IO thread pool in the Connector. This
thread pool is used for split preloading and DirectBufferedInput. By default,
the value is the same as the maximum task slots per Spark executor.
[...]
| spark.gluten.sql.columnar.backend.velox.SplitPreloadPerDriver
| 2 | The split preload per task
[...]
@@ -48,6 +48,8 @@ nav_order: 16
|
spark.gluten.sql.columnar.backend.velox.memoryPoolCapacityTransferAcrossTasks
| true | Whether to allow memory capacity transfer between memory
pools from different tasks.
[...]
| spark.gluten.sql.columnar.backend.velox.memoryUseHugePages
| false | Use explicit huge pages for Velox memory allocation.
[...]
| spark.gluten.sql.columnar.backend.velox.orc.scan.enabled
| true | Enable velox orc scan. If disabled, vanilla spark orc
scan will be used.
[...]
+| spark.gluten.sql.columnar.backend.velox.orcUseColumnNames
| true | Maps table field names to file field names using
names, not indices for ORC files. If this is set to false Gluten will fallback
to vanilla Spark if it does not support all column types present in any of the
schemas of the tables being read, at this time unsupported types include
TimestampNTZ and Char.
[...]
+| spark.gluten.sql.columnar.backend.velox.parquetUseColumnNames
| true | Maps table field names to file field names using
names, not indices for Parquet files. If this is set to false Gluten will
fallback to vanilla Spark if it does not support all column types present in
any of the schemas of the tables being read, at this time unsupported types
include TimestampNTZ and Char.
[...]
| spark.gluten.sql.columnar.backend.velox.prefetchRowGroups
| 1 | Set the prefetch row groups for velox file scan
[...]
| spark.gluten.sql.columnar.backend.velox.queryTraceEnabled
| false | Enable query tracing flag.
[...]
| spark.gluten.sql.columnar.backend.velox.reclaimMaxWaitMs
| 3600000ms | The max time in ms to wait for memory reclaim.
[...]
diff --git
a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java
b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java
index 7a17372ec2..5a1b106db8 100644
---
a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java
+++
b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java
@@ -109,15 +109,14 @@ public class LocalFilesNode implements SplitInfo {
private NamedStruct buildNamedStruct() {
NamedStruct.Builder namedStructBuilder = NamedStruct.newBuilder();
- if (fileSchema != null) {
- Type.Struct.Builder structBuilder = Type.Struct.newBuilder();
- for (StructField field : fileSchema.fields()) {
- structBuilder.addTypes(
- ConverterUtils.getTypeNode(field.dataType(),
field.nullable()).toProtobuf());
-
namedStructBuilder.addNames(ConverterUtils.normalizeColName(field.name()));
- }
- namedStructBuilder.setStruct(structBuilder.build());
+ Type.Struct.Builder structBuilder = Type.Struct.newBuilder();
+ for (StructField field : fileSchema.fields()) {
+ structBuilder.addTypes(
+ ConverterUtils.getTypeNode(field.dataType(),
field.nullable()).toProtobuf());
+
namedStructBuilder.addNames(ConverterUtils.normalizeColName(field.name()));
}
+ namedStructBuilder.setStruct(structBuilder.build());
+
return namedStructBuilder.build();
}
@@ -195,8 +194,11 @@ public class LocalFilesNode implements SplitInfo {
ReadRel.LocalFiles.FileOrFiles.metadataColumn.newBuilder();
fileBuilder.addMetadataColumns(mcBuilder.build());
}
- NamedStruct namedStruct = buildNamedStruct();
- fileBuilder.setSchema(namedStruct);
+
+ if (fileSchema != null) {
+ NamedStruct namedStruct = buildNamedStruct();
+ fileBuilder.setSchema(namedStruct);
+ }
if (!otherMetadataColumns.isEmpty()) {
Map<String, Object> otherMetadatas = otherMetadataColumns.get(i);
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index eea34e8c44..7c84218681 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.connector.read.Scan
import
org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand
import org.apache.spark.sql.execution.datasources.{FileFormat,
InsertIntoHadoopFsRelationCommand}
-import org.apache.spark.sql.types.StructField
+import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.hadoop.conf.Configuration
@@ -39,7 +39,8 @@ trait BackendSettingsApi {
def validateScanExec(
format: ReadFileFormat,
- fields: Array[StructField],
+ fields: Array[StructField], // the fields to be output
+ dataSchema: StructType, // the schema of the table
rootPaths: Seq[String],
properties: Map[String, String],
hadoopConf: Configuration): ValidationResult =
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
index 0a19d2207d..34f7f41b78 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
@@ -34,6 +34,7 @@ trait IteratorApi {
def genSplitInfo(
partition: InputPartition,
partitionSchema: StructType,
+ dataSchema: StructType,
fileFormat: ReadFileFormat,
metadataColumnNames: Seq[String],
properties: Map[String, String]): SplitInfo
@@ -42,6 +43,7 @@ trait IteratorApi {
partitionIndex: Int,
partition: Seq[InputPartition],
partitionSchema: StructType,
+ dataSchema: StructType,
fileFormat: ReadFileFormat,
metadataColumnNames: Seq[String],
properties: Map[String, String]): SplitInfo = throw new
UnsupportedOperationException()
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
index 7967122424..a8524d53f2 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
@@ -27,7 +27,6 @@ import
org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.connector.read.InputPartition
-import org.apache.spark.sql.hive.HiveTableScanExecTransformer
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import com.google.protobuf.StringValue
@@ -68,28 +67,18 @@ trait BasicScanExecTransformer extends LeafTransformSupport
with BaseDataSource
.genSplitInfo(
_,
getPartitionSchema,
+ getDataSchema,
fileFormat,
getMetadataColumns().map(_.name),
getProperties))
}
override protected def doValidateInternal(): ValidationResult = {
- var fields = schema.fields
-
- this match {
- case transformer: FileSourceScanExecTransformer =>
- fields = appendStringFields(transformer.relation.schema, fields)
- case transformer: HiveTableScanExecTransformer =>
- fields = appendStringFields(transformer.getDataSchema, fields)
- case transformer: BatchScanExecTransformer =>
- fields = appendStringFields(transformer.getDataSchema, fields)
- case _ =>
- }
-
val validationResult = BackendsApiManager.getSettings
.validateScanExec(
fileFormat,
- fields,
+ schema.fields,
+ getDataSchema,
getRootFilePaths,
getProperties,
sparkContext.hadoopConfiguration)
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
index ae13078f99..03a840cccd 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
@@ -139,6 +139,7 @@ abstract class BatchScanExecTransformerBase(
index,
partitions,
getPartitionSchema,
+ getDataSchema,
fileFormat,
getMetadataColumns().map(_.name),
getProperties)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]