zhztheplayer commented on code in PR #7634:
URL: https://github.com/apache/incubator-gluten/pull/7634#discussion_r1812011582
##########
backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala:
##########
@@ -88,78 +88,92 @@ object VeloxBackendSettings extends BackendSettingsApi {
val GLUTEN_VELOX_INTERNAL_UDF_LIB_PATHS = VeloxBackend.CONF_PREFIX +
".internal.udfLibraryPaths"
val GLUTEN_VELOX_UDF_ALLOW_TYPE_CONVERSION = VeloxBackend.CONF_PREFIX +
".udfAllowTypeConversion"
- val MAXIMUM_BATCH_SIZE: Int = 32768
-
- override def validateScan(
+ override def validateScanExec(
format: ReadFileFormat,
fields: Array[StructField],
rootPaths: Seq[String]): ValidationResult = {
- val filteredRootPaths = distinctRootPaths(rootPaths)
- if (
- filteredRootPaths.nonEmpty && !VeloxFileSystemValidationJniWrapper
- .allSupportedByRegisteredFileSystems(filteredRootPaths.toArray)
- ) {
- return ValidationResult.failed(
- s"Scheme of [$filteredRootPaths] is not supported by registered file
systems.")
- }
- // Validate if all types are supported.
- def validateTypes(validatorFunc: PartialFunction[StructField, String]):
ValidationResult = {
- // Collect unsupported types.
- val unsupportedDataTypeReason = fields.collect(validatorFunc)
- if (unsupportedDataTypeReason.isEmpty) {
- ValidationResult.succeeded
+
+ def validateScheme(): Option[String] = {
+ val filteredRootPaths = distinctRootPaths(rootPaths)
+ if (
+ filteredRootPaths.nonEmpty && !VeloxFileSystemValidationJniWrapper
+ .allSupportedByRegisteredFileSystems(filteredRootPaths.toArray)
+ ) {
+ Some(s"Scheme of [$filteredRootPaths] is not supported by registered
file systems.")
} else {
- ValidationResult.failed(
- s"Found unsupported data type in $format:
${unsupportedDataTypeReason.mkString(", ")}.")
+ None
}
}
- format match {
- case ParquetReadFormat =>
- val typeValidator: PartialFunction[StructField, String] = {
- // Parquet timestamp is not fully supported yet
- case StructField(_, TimestampType, _, _)
- if
GlutenConfig.getConf.forceParquetTimestampTypeScanFallbackEnabled =>
- "TimestampType"
- }
- validateTypes(typeValidator)
- case DwrfReadFormat => ValidationResult.succeeded
- case OrcReadFormat =>
- if (!GlutenConfig.getConf.veloxOrcScanEnabled) {
- ValidationResult.failed(s"Velox ORC scan is turned off.")
+ def validateFormat(): Option[String] = {
+ def validateTypes(validatorFunc: PartialFunction[StructField, String]):
Option[String] = {
+ // Collect unsupported types.
+ val unsupportedDataTypeReason = fields.collect(validatorFunc)
+ if (unsupportedDataTypeReason.nonEmpty) {
+ Some(
+ s"Found unsupported data type in $format:
${unsupportedDataTypeReason.mkString(", ")}.")
} else {
+ None
+ }
+ }
+
+ def isCharType(stringType: StringType, metadata: Metadata): Boolean = {
+ val charTypePattern = "char\\((\\d+)\\)".r
+ GlutenConfig.getConf.forceOrcCharTypeScanFallbackEnabled &&
charTypePattern
+ .findFirstIn(
+ CharVarcharUtils
+ .getRawTypeString(metadata)
+ .getOrElse(stringType.catalogString))
+ .isDefined
+ }
+
+ format match {
+ case ParquetReadFormat =>
val typeValidator: PartialFunction[StructField, String] = {
- case StructField(_, arrayType: ArrayType, _, _)
- if arrayType.elementType.isInstanceOf[StructType] =>
- "StructType as element in ArrayType"
- case StructField(_, arrayType: ArrayType, _, _)
- if arrayType.elementType.isInstanceOf[ArrayType] =>
- "ArrayType as element in ArrayType"
- case StructField(_, mapType: MapType, _, _)
- if mapType.keyType.isInstanceOf[StructType] =>
- "StructType as Key in MapType"
- case StructField(_, mapType: MapType, _, _)
- if mapType.valueType.isInstanceOf[ArrayType] =>
- "ArrayType as Value in MapType"
- case StructField(_, stringType: StringType, _, metadata)
- if isCharType(stringType, metadata) =>
- CharVarcharUtils.getRawTypeString(metadata) + " not support"
- case StructField(_, TimestampType, _, _) => "TimestampType not
support"
+ // Parquet timestamp is not fully supported yet
+ case StructField(_, TimestampType, _, _)
+ if
GlutenConfig.getConf.forceParquetTimestampTypeScanFallbackEnabled =>
+ "TimestampType(force fallback)"
}
- validateTypes(typeValidator)
- }
- case _ => ValidationResult.failed(s"Unsupported file format for
$format.")
+ if (SQLConf.get.isParquetSchemaMergingEnabled) {
+ // https://github.com/apache/incubator-gluten/issues/7174
+ Some(s"not support when
${SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key} is true")
+ } else {
+ validateTypes(typeValidator)
+ }
Review Comment:
By looking into [this Spark
code](https://github.com/apache/spark/blob/51e915a0ca2d3f1c9d8337d1ee9692247026c40f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala#L69-L72),
I assume the config `SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key` can be
overridden dynamically in file read options? Can we use the final overridden
value of `ParquetOptions.mergeSchema`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]