voonhous commented on code in PR #18961:
URL: https://github.com/apache/hudi/pull/18961#discussion_r3414968838
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala:
##########
@@ -56,12 +56,41 @@ trait HoodieHadoopFsRelationFactory {
def buildOptions(): Map[String, String]
}
+object HoodieBaseHadoopFsRelationFactory {
+ /**
+ * Resolves the variant allow-reading-shredded value using the precedence:
+ * table option > hoodie session conf > explicit Spark conf > Hudi default.
+ */
+ private[hudi] def resolveVariantAllowReadingShredded(tableOption:
Option[String],
+ hoodieSessionValue:
Option[String],
+ sparkConfValue:
Option[String],
+ hudiDefault: String):
String =
+
tableOption.orElse(hoodieSessionValue).orElse(sparkConfValue).getOrElse(hudiDefault)
+}
+
abstract class HoodieBaseHadoopFsRelationFactory(val sqlContext: SQLContext,
val metaClient:
HoodieTableMetaClient,
val options: Map[String,
String],
val schemaSpec:
Option[StructType],
val isBootstrap: Boolean
) extends SparkAdapterSupport
with HoodieHadoopFsRelationFactory with Logging {
+ // Propagate Hudi's variant allow-reading-shredded config to Spark's SQLConf.
+ // ParquetToSparkSchemaConverter reads this from SQLConf.get(), so it must
be set
+ // before query execution starts here during table resolution
+ if (HoodieSparkUtils.gteqSpark4_0) {
+ val sqlConf = sqlContext.sparkSession.sessionState.conf
+ val hoodieConfKey =
HoodieStorageConfig.PARQUET_VARIANT_ALLOW_READING_SHREDDED.key
+ // Literal, not SQLConf.VARIANT_ALLOW_READING_SHREDDED.key: that field is
absent when this module compiles against Spark 3.x.
+ val sparkConfKey = "spark.sql.variant.allowReadingShredded"
+ // Precedence: table option > hoodie session key > explicit Spark conf >
Hudi default.
+ val allowReadingShredded =
HoodieBaseHadoopFsRelationFactory.resolveVariantAllowReadingShredded(
+ options.get(hoodieConfKey),
+ if (sqlConf.contains(hoodieConfKey))
Some(sqlConf.getConfString(hoodieConfKey)) else None,
+ if (sqlConf.contains(sparkConfKey))
Some(sqlConf.getConfString(sparkConfKey)) else None,
+
HoodieStorageConfig.PARQUET_VARIANT_ALLOW_READING_SHREDDED.defaultValue.toString)
+ sqlConf.setConfString(sparkConfKey, allowReadingShredded)
Review Comment:
The flagged code comes from the stacked-base PR #18065; it's fixed there.
I dropped the session-conf mutation rather than scoping it.
`ParquetToSparkSchemaConverter` reads `VARIANT_ALLOW_READING_SHREDDED` only via
`SQLConf.get` (Spark 4.0.2 and 4.1.1), and the Hudi and Spark defaults are both
`true`, so shredded reads work without touching the session conf, and an
explicitly set `spark.sql.variant.allowReadingShredded` is honored directly by
the converter. The `Spark40/41ParquetReader` Hadoop-conf push turned out to be
dead (nothing in the read path reads that key from the conf), so it's removed
too. The AVRO read path is unaffected; it keeps its own per-read flag via
`VariantReconstruction`.
Will rebase this PR onto #18065 once that lands.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]