alexeykudinkin commented on code in PR #6358:
URL: https://github.com/apache/hudi/pull/6358#discussion_r1030020375
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -347,6 +385,91 @@ object HoodieSparkSqlWriter {
}
}
+ /**
+ * Deduces writer's schema based on
+ * <ul>
+ * <li>Source's schema</li>
+ * <li>Target table's schema (including Hudi's [[InternalSchema]]
representation)</li>
+ * </ul>
+ */
+ def deduceWriterSchema(sourceSchema: Schema,
+ latestTableSchemaOpt: Option[Schema],
+ internalSchemaOpt: Option[InternalSchema],
+ opts: Map[String, String]): Schema = {
+ val shouldReconcileSchema =
opts(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean
+ val shouldValidateSchemasCompatibility =
opts.getOrDefault(HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key,
+ HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.defaultValue).toBoolean
+
+ latestTableSchemaOpt match {
+ // In case table schema is empty we're just going to use the source
schema as a
+ // writer's schema. No additional handling is required
+ case None => sourceSchema
+ // Otherwise, we need to make sure we reconcile incoming and latest
table schemas
+ case Some(latestTableSchema) =>
+ // Before validating whether schemas are compatible, we need to
"canonicalize" source's schema
+ // relative to the table's one, by doing a (minor) reconciliation of
the nullability constraints:
+ // for ex, if in incoming schema column A is designated as non-null,
but it's designated as nullable
+ // in the table's one we want to proceed aligning nullability
constraints w/ the table's schema
+ val shouldCanonicalizeSchema =
opts.getOrDefault(DataSourceWriteOptions.CANONICALIZE_SCHEMA.key,
+
DataSourceWriteOptions.CANONICALIZE_SCHEMA.defaultValue.toString).toBoolean
+ val canonicalizedSourceSchema = if (shouldCanonicalizeSchema) {
+ AvroSchemaEvolutionUtils.canonicalizeColumnNullability(sourceSchema,
latestTableSchema)
+ } else {
+ sourceSchema
+ }
+
+ if (shouldReconcileSchema) {
+ internalSchemaOpt match {
+ case Some(internalSchema) =>
+ // Apply schema evolution, by auto-merging write schema and read
schema
+ val mergedInternalSchema =
AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema,
internalSchema)
+ AvroInternalSchemaConverter.convert(mergedInternalSchema,
latestTableSchema.getFullName)
+
+ case None =>
+ // In case schema reconciliation is enabled we will employ
(legacy) reconciliation
+ // strategy to produce target writer's schema (see definition
below)
+ val (reconciledSchema, isCompatible) =
reconcileSchemasLegacy(latestTableSchema, canonicalizedSourceSchema)
Review Comment:
@nsivabalan this is the change we have discussed yday: restoring previous
(legacy) semantic of always picking "wider" schema
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]