alexeykudinkin commented on code in PR #7333:
URL: https://github.com/apache/hudi/pull/7333#discussion_r1035446772
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala:
##########
@@ -90,10 +90,14 @@ class IncrementalRelation(val sqlContext: SQLContext,
val (usedSchema, internalSchema) = {
log.info("Inferring schema..")
val schemaResolver = new TableSchemaResolver(metaClient)
- val iSchema = if (useEndInstantSchema && !commitsToReturn.isEmpty) {
-
InternalSchemaCache.searchSchemaAndCache(commitsToReturn.last.getTimestamp.toLong,
metaClient, hoodieTable.getConfig.getInternalSchemaCacheEnable)
+ val iSchema = if (isSchemaEvolutionEnabledOnRead) {
+ if (useEndInstantSchema && !commitsToReturn.isEmpty) {
+
InternalSchemaCache.searchSchemaAndCache(commitsToReturn.last.getTimestamp.toLong,
metaClient, hoodieTable.getConfig.getInternalSchemaCacheEnable)
+ } else {
+ schemaResolver.getTableInternalSchemaFromCommitMetadata.orElse(null)
+ }
} else {
- schemaResolver.getTableInternalSchemaFromCommitMetadata.orElse(null)
+ Option.empty.asInstanceOf[InternalSchema]
Review Comment:
Should be `Option.empty[InternalSchema]`
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -200,16 +198,20 @@ object HoodieSparkSqlWriter {
.getOrElse(getAvroRecordNameAndNamespace(tblName))
val sourceSchema = convertStructTypeToAvroSchema(df.schema,
avroRecordName, avroRecordNamespace)
- val internalSchemaOpt = getLatestTableInternalSchema(fs, basePath,
sparkContext).orElse {
- val schemaEvolutionEnabled =
parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
-
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean
- // In case we need to reconcile the schema and schema evolution is
enabled,
- // we will force-apply schema evolution to the writer's schema
- if (shouldReconcileSchema && schemaEvolutionEnabled) {
- Some(AvroInternalSchemaConverter.convert(sourceSchema))
- } else {
- None
+ val schemaEvolutionEnabled =
parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
Review Comment:
Let's actually hide this check inside `getLatestTableInternalSchema`
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -249,7 +251,13 @@ object HoodieSparkSqlWriter {
}
// Create a HoodieWriteClient & issue the delete.
- val internalSchemaOpt = getLatestTableInternalSchema(fs, basePath,
sparkContext)
+ // Create a HoodieWriteClient & issue the delete.
+ val schemaEvolutionEnabled =
parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(),
"false").toBoolean
+ val internalSchemaOpt :
Option[org.apache.hudi.internal.schema.InternalSchema]= if
(schemaEvolutionEnabled) {
+ getLatestTableInternalSchema(fs, basePath, sparkContext)
Review Comment:
Please check comment above
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala:
##########
@@ -90,10 +90,14 @@ class IncrementalRelation(val sqlContext: SQLContext,
val (usedSchema, internalSchema) = {
log.info("Inferring schema..")
val schemaResolver = new TableSchemaResolver(metaClient)
- val iSchema = if (useEndInstantSchema && !commitsToReturn.isEmpty) {
-
InternalSchemaCache.searchSchemaAndCache(commitsToReturn.last.getTimestamp.toLong,
metaClient, hoodieTable.getConfig.getInternalSchemaCacheEnable)
+ val iSchema = if (isSchemaEvolutionEnabledOnRead) {
Review Comment:
Let's instead shape it as an else-if chain:
```
if (!evolutionEnabled) {
None
} else if (...) ...
```
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala:
##########
@@ -121,6 +125,13 @@ class IncrementalRelation(val sqlContext: SQLContext,
override def schema: StructType = usedSchema
+ private def isSchemaEvolutionEnabledOnRead = {
+ optParams.getOrElse(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
Review Comment:
Let's abstract this config retrieval in a common method (we can reuse it in
other places as well)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]