nsivabalan commented on code in PR #9743:
URL: https://github.com/apache/hudi/pull/9743#discussion_r1379422146
##########
hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java:
##########
@@ -111,18 +117,22 @@ public static InternalSchema reconcileSchema(Schema
incomingSchema, InternalSche
return
SchemaChangeUtils.applyTableChanges2Schema(internalSchemaAfterAddColumns,
typeChange);
}
+ public static Schema reconcileSchema(Schema incomingSchema, Schema
oldTableSchema) {
+ return convert(reconcileSchema(incomingSchema, convert(oldTableSchema)),
oldTableSchema.getFullName());
+ }
+
/**
- * Reconciles nullability requirements b/w {@code source} and {@code target}
schemas,
+ * Reconciles nullability and datatype requirements b/w {@code source} and
{@code target} schemas,
* by adjusting these of the {@code source} schema to be in-line with the
ones of the
* {@code target} one
*
* @param sourceSchema source schema that needs reconciliation
* @param targetSchema target schema that source schema will be reconciled
against
- * @param opts config options
- * @return schema (based off {@code source} one) that has nullability
constraints reconciled
+ * @param opts config options
+ * @return schema (based off {@code source} one) that has nullability
constraints and datatypes reconciled
*/
- public static Schema reconcileNullability(Schema sourceSchema, Schema
targetSchema, Map<String, String> opts) {
- if (sourceSchema.getFields().isEmpty() ||
targetSchema.getFields().isEmpty()) {
+ public static Schema reconcileSchemaRequirements(Schema sourceSchema, Schema
targetSchema, Map<String, String> opts) {
+ if (sourceSchema.getType() == Schema.Type.NULL ||
sourceSchema.getFields().isEmpty() || targetSchema.getFields().isEmpty()) {
Review Comment:
if source schema fields are empty, shouldn't we be returning targetSchema.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -145,6 +145,13 @@ object HoodieSparkSqlWriter {
new HoodieSparkSqlWriterInternal().deduceWriterSchema(sourceSchema,
latestTableSchemaOpt, internalSchemaOpt, opts)
}
+ def deduceWriterSchema(sourceSchema: Schema,
Review Comment:
if its static, shouldn't we move them to Object HoodieSparkSqlWriter.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -624,17 +635,25 @@ class HoodieSparkSqlWriterInternal {
} else {
if (!shouldValidateSchemasCompatibility) {
// if no validation is enabled, check for col drop
- // if col drop is allowed, go ahead. if not, check for
projection, so that we do not allow dropping cols
- if (allowAutoEvolutionColumnDrop ||
canProject(latestTableSchema, canonicalizedSourceSchema)) {
+ if (allowAutoEvolutionColumnDrop) {
canonicalizedSourceSchema
} else {
- log.error(
- s"""Incoming batch schema is not compatible with the table's
one.
- |Incoming schema ${sourceSchema.toString(true)}
- |Incoming schema (canonicalized)
${canonicalizedSourceSchema.toString(true)}
- |Table's schema ${latestTableSchema.toString(true)}
- |""".stripMargin)
- throw new SchemaCompatibilityException("Incoming batch schema
is not compatible with the table's one")
+ val reconciledSchema = if (addNullForDeletedColumns) {
+
AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema,
latestTableSchema)
+ } else {
+ canonicalizedSourceSchema
+ }
+ if (isValidEvolutionOf(reconciledSchema, latestTableSchema)) {
+ reconciledSchema
+ } else {
+ log.error(
+ s"""Incoming batch schema is not compatible with the
table's one.
+ |Incoming schema ${sourceSchema.toString(true)}
+ |Incoming schema (canonicalized)
${reconciledSchema.toString(true)}
+ |Table's schema ${latestTableSchema.toString(true)}
+ |""".stripMargin)
+ throw new SchemaCompatibilityException("Incoming batch
schema is not compatible with the table's one")
+ }
Review Comment:
also, can you throw light on why we don't call
AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema,
latestTableSchema)
in else block in L 658.
ie. when reconcile schema is set to false, and AVRO_SCHEMA_VALIDATE_ENABLE
is set to true, looks like we don't ever call
AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema,
latestTableSchema) .
also, curious to know whats the diff b/w
AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema,
latestTableSchema) and HoodieSparkSqlWriter.canonicalizeSchema()
bcoz, both takes in both source schema and table schema as args
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -624,17 +635,25 @@ class HoodieSparkSqlWriterInternal {
} else {
if (!shouldValidateSchemasCompatibility) {
// if no validation is enabled, check for col drop
- // if col drop is allowed, go ahead. if not, check for
projection, so that we do not allow dropping cols
- if (allowAutoEvolutionColumnDrop ||
canProject(latestTableSchema, canonicalizedSourceSchema)) {
+ if (allowAutoEvolutionColumnDrop) {
canonicalizedSourceSchema
} else {
- log.error(
- s"""Incoming batch schema is not compatible with the table's
one.
- |Incoming schema ${sourceSchema.toString(true)}
- |Incoming schema (canonicalized)
${canonicalizedSourceSchema.toString(true)}
- |Table's schema ${latestTableSchema.toString(true)}
- |""".stripMargin)
- throw new SchemaCompatibilityException("Incoming batch schema
is not compatible with the table's one")
+ val reconciledSchema = if (addNullForDeletedColumns) {
+
AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema,
latestTableSchema)
+ } else {
+ canonicalizedSourceSchema
+ }
+ if (isValidEvolutionOf(reconciledSchema, latestTableSchema)) {
Review Comment:
Looks like even if shouldValidateSchemasCompatibility
(AVRO_SCHEMA_VALIDATE_ENABLE, i.e. hoodie.avro.schema.validate) is set to
false, we do check for valid schema evolution here.
may I know why ?
##########
hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java:
##########
@@ -41,18 +42,23 @@ public class AvroSchemaEvolutionUtils {
* 1) incoming data has missing columns that were already defined in the
table –> null values will be injected into missing columns
* 2) incoming data contains new columns not defined yet in the table ->
columns will be added to the table schema (incoming dataframe?)
* 3) incoming data has missing columns that are already defined in the
table and new columns not yet defined in the table ->
- * new columns will be added to the table schema, missing columns will
be injected with null values
+ * new columns will be added to the table schema, missing columns will be
injected with null values
* 4) support type change
* 5) support nested schema change.
* Notice:
- * the incoming schema should not have delete/rename semantics.
- * for example: incoming schema: int a, int b, int d; oldTableSchema
int a, int b, int c, int d
- * we must guarantee the column c is missing semantic, instead of delete
semantic.
+ * the incoming schema should not have delete/rename semantics.
+ * for example: incoming schema: int a, int b, int d; oldTableSchema int
a, int b, int c, int d
+ * we must guarantee the column c is missing semantic, instead of delete
semantic.
+ *
* @param incomingSchema implicitly evolution of avro when hoodie write
operation
* @param oldTableSchema old internalSchema
* @return reconcile Schema
*/
public static InternalSchema reconcileSchema(Schema incomingSchema,
InternalSchema oldTableSchema) {
+ /* If incoming schema is null, we fall back on table schema. */
Review Comment:
we should try and write lot of UTs for this method. thats the core. then
mostly wiring is required from outside.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -545,33 +552,37 @@ class HoodieSparkSqlWriterInternal {
latestTableSchemaOpt: Option[Schema],
internalSchemaOpt: Option[InternalSchema],
opts: Map[String, String]): Schema = {
+ val addNullForDeletedColumns =
opts.getOrDefault(DataSourceWriteOptions.ADD_NULL_FOR_DELETED_COLUMNS.key(),
Review Comment:
we definitely need good docs for this method. lets enhance docs in L545.
even having illustrative examples is totally fine. We have been soft/low key on
schema evolution in general. lets button up and ensure we get it right this
time.
##########
hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/AvroInternalSchemaConverter.java:
##########
@@ -68,6 +68,27 @@ public static Schema convert(InternalSchema internalSchema,
String name) {
return buildAvroSchemaFromInternalSchema(internalSchema, name);
}
+ /**
+ * converting from avro -> internal schema -> avro
+ * causes null to always be first in unions.
+ * if we compare a schema that has not been converted to internal schema
+ * at any stage, the difference in ordering can cause issues. To resolve
this,
+ * we order null to be first for any avro schema that enters into hudi.
+ * AvroSchemaUtils.isProjectionOfInternal uses index based comparison for
unions.
+ * Spark and flink don't support complex unions so this would not be an issue
+ * but for the metadta table HoodieMetadata.avsc uses a trick where we have
a bunch of
+ * different types wrapped in record for col stats
+ *
+ * @param Schema avro schema.
+ * @return an avro Schema where null is the first.
+ */
+ public static Schema fixNullOrdering(Schema schema) {
+ if (schema.getType() == Schema.Type.NULL) {
+ return schema;
+ }
+ return convert(convert(schema), schema.getFullName());
Review Comment:
curious to know why do we need two convertions. can't we directly fix or
create AvorSchema by fixing the null ordering
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java:
##########
@@ -173,7 +175,12 @@ private RecordIterator(Schema readerSchema, Schema
writerSchema, byte[] content)
this.totalRecords = this.dis.readInt();
}
- this.reader = new GenericDatumReader<>(writerSchema, readerSchema);
+ if (recordNeedsRewriteForExtendedAvroTypePromotion(writerSchema,
readerSchema)) {
Review Comment:
don't we need a similar fix in other data blocks. for eg, parquet data
blocks.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java:
##########
@@ -173,7 +175,12 @@ private RecordIterator(Schema readerSchema, Schema
writerSchema, byte[] content)
this.totalRecords = this.dis.readInt();
}
- this.reader = new GenericDatumReader<>(writerSchema, readerSchema);
+ if (recordNeedsRewriteForExtendedAvroTypePromotion(writerSchema,
readerSchema)) {
Review Comment:
So, incase we route the records to create handle or merge handle, can you
point me to the code where this promotion happens.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -545,33 +552,37 @@ class HoodieSparkSqlWriterInternal {
latestTableSchemaOpt: Option[Schema],
internalSchemaOpt: Option[InternalSchema],
opts: Map[String, String]): Schema = {
+ val addNullForDeletedColumns =
opts.getOrDefault(DataSourceWriteOptions.ADD_NULL_FOR_DELETED_COLUMNS.key(),
+
DataSourceWriteOptions.ADD_NULL_FOR_DELETED_COLUMNS.defaultValue).toBoolean
val shouldReconcileSchema =
opts(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean
val shouldValidateSchemasCompatibility =
opts.getOrDefault(HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key,
HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.defaultValue).toBoolean
latestTableSchemaOpt match {
// In case table schema is empty we're just going to use the source
schema as a
- // writer's schema. No additional handling is required
- case None => sourceSchema
+ // writer's schema.
+ case None => AvroInternalSchemaConverter.fixNullOrdering(sourceSchema)
// Otherwise, we need to make sure we reconcile incoming and latest
table schemas
case Some(latestTableSchemaWithMetaFields) =>
// NOTE: Meta-fields will be unconditionally injected by Hudi writing
handles, for the sake of
// deducing proper writer schema we're stripping them to make
sure we can perform proper
// analysis
- val latestTableSchema =
removeMetadataFields(latestTableSchemaWithMetaFields)
+ //add call to fix null ordering to ensure backwards compatibility
+ val latestTableSchema =
AvroInternalSchemaConverter.fixNullOrdering(removeMetadataFields(latestTableSchemaWithMetaFields))
// Before validating whether schemas are compatible, we need to
"canonicalize" source's schema
// relative to the table's one, by doing a (minor) reconciliation of
the nullability constraints:
// for ex, if in incoming schema column A is designated as non-null,
but it's designated as nullable
// in the table's one we want to proceed aligning nullability
constraints w/ the table's schema
- val shouldCanonicalizeNullable =
opts.getOrDefault(CANONICALIZE_NULLABLE.key,
- CANONICALIZE_NULLABLE.defaultValue.toString).toBoolean
+ // Also, we promote types to the latest table schema if possible.
+ val shouldCanonicalizeSchema =
opts.getOrDefault(CANONICALIZE_SCHEMA.key,
+ CANONICALIZE_SCHEMA.defaultValue.toString).toBoolean
val mergeIntoWrites = opts.getOrDefault(SQL_MERGE_INTO_WRITES.key(),
SQL_MERGE_INTO_WRITES.defaultValue.toString).toBoolean
- val canonicalizedSourceSchema = if (shouldCanonicalizeNullable) {
+ val canonicalizedSourceSchema = if (shouldCanonicalizeSchema) {
Review Comment:
is removing canonicalize will be a follow up patch ?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -545,33 +552,37 @@ class HoodieSparkSqlWriterInternal {
latestTableSchemaOpt: Option[Schema],
internalSchemaOpt: Option[InternalSchema],
opts: Map[String, String]): Schema = {
+ val addNullForDeletedColumns =
opts.getOrDefault(DataSourceWriteOptions.ADD_NULL_FOR_DELETED_COLUMNS.key(),
Review Comment:
the schema handling is grown up to be sizable now. Lets move it to a
separate static class instead of adding everything to HoodieSparkSqlWrtier.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]