This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 2e39bfb6940 [HUDI-6947] Refactored
HoodieSchemaUtils.deduceWriterSchema with many flags (#10810)
2e39bfb6940 is described below
commit 2e39bfb694099293b77eec9977e5e46af97af18b
Author: Geser Dugarov <[email protected]>
AuthorDate: Thu Mar 7 12:23:38 2024 +0700
[HUDI-6947] Refactored HoodieSchemaUtils.deduceWriterSchema with many flags
(#10810)
---
.../scala/org/apache/hudi/HoodieSchemaUtils.scala | 176 +++++++++++----------
1 file changed, 93 insertions(+), 83 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala
index cfc43453e9c..9aeff64f237 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala
@@ -76,107 +76,117 @@ object HoodieSchemaUtils {
latestTableSchemaOpt: Option[Schema],
internalSchemaOpt: Option[InternalSchema],
opts: Map[String, String]): Schema = {
- val setNullForMissingColumns =
opts.getOrDefault(DataSourceWriteOptions.SET_NULL_FOR_MISSING_COLUMNS.key(),
-
DataSourceWriteOptions.SET_NULL_FOR_MISSING_COLUMNS.defaultValue).toBoolean
- val shouldReconcileSchema =
opts.getOrDefault(DataSourceWriteOptions.RECONCILE_SCHEMA.key(),
-
DataSourceWriteOptions.RECONCILE_SCHEMA.defaultValue().toString).toBoolean
- val shouldValidateSchemasCompatibility =
opts.getOrDefault(HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key,
- HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.defaultValue).toBoolean
-
latestTableSchemaOpt match {
- // In case table schema is empty we're just going to use the source
schema as a
- // writer's schema.
+ // If table schema is empty, then we use the source schema as a writer's
schema.
case None => AvroInternalSchemaConverter.fixNullOrdering(sourceSchema)
// Otherwise, we need to make sure we reconcile incoming and latest
table schemas
case Some(latestTableSchemaWithMetaFields) =>
- // NOTE: Meta-fields will be unconditionally injected by Hudi writing
handles, for the sake of
- // deducing proper writer schema we're stripping them to make
sure we can perform proper
- // analysis
- //add call to fix null ordering to ensure backwards compatibility
+ // NOTE: Meta-fields will be unconditionally injected by Hudi writing
handles, for the sake of deducing proper writer schema
+ // we're stripping them to make sure we can perform proper
analysis
+ // add call to fix null ordering to ensure backwards compatibility
val latestTableSchema =
AvroInternalSchemaConverter.fixNullOrdering(removeMetadataFields(latestTableSchemaWithMetaFields))
+
// Before validating whether schemas are compatible, we need to
"canonicalize" source's schema
// relative to the table's one, by doing a (minor) reconciliation of
the nullability constraints:
// for ex, if in incoming schema column A is designated as non-null,
but it's designated as nullable
// in the table's one we want to proceed aligning nullability
constraints w/ the table's schema
// Also, we promote types to the latest table schema if possible.
- val shouldCanonicalizeSchema =
opts.getOrDefault(CANONICALIZE_SCHEMA.key,
- CANONICALIZE_SCHEMA.defaultValue.toString).toBoolean
- val mergeIntoWrites = opts.getOrDefault(SQL_MERGE_INTO_WRITES.key(),
- SQL_MERGE_INTO_WRITES.defaultValue.toString).toBoolean
-
+ val shouldCanonicalizeSchema =
opts.getOrDefault(CANONICALIZE_SCHEMA.key,
CANONICALIZE_SCHEMA.defaultValue.toString).toBoolean
val canonicalizedSourceSchema = if (shouldCanonicalizeSchema) {
canonicalizeSchema(sourceSchema, latestTableSchema, opts)
} else {
AvroInternalSchemaConverter.fixNullOrdering(sourceSchema)
}
- val allowAutoEvolutionColumnDrop =
opts.getOrDefault(HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key,
-
HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.defaultValue).toBoolean
-
+ val shouldReconcileSchema =
opts.getOrDefault(DataSourceWriteOptions.RECONCILE_SCHEMA.key(),
+
DataSourceWriteOptions.RECONCILE_SCHEMA.defaultValue().toString).toBoolean
if (shouldReconcileSchema) {
- internalSchemaOpt match {
- case Some(internalSchema) =>
- // Apply schema evolution, by auto-merging write schema and read
schema
- val mergedInternalSchema =
AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema,
internalSchema)
- val evolvedSchema =
AvroInternalSchemaConverter.convert(mergedInternalSchema,
latestTableSchema.getFullName)
- val shouldRemoveMetaDataFromInternalSchema =
sourceSchema.getFields().filter(f =>
f.name().equalsIgnoreCase(HoodieRecord.RECORD_KEY_METADATA_FIELD)).isEmpty
- if (shouldRemoveMetaDataFromInternalSchema)
HoodieAvroUtils.removeMetadataFields(evolvedSchema) else evolvedSchema
- case None =>
- // In case schema reconciliation is enabled we will employ
(legacy) reconciliation
- // strategy to produce target writer's schema (see definition
below)
- val (reconciledSchema, isCompatible) =
- reconcileSchemasLegacy(latestTableSchema,
canonicalizedSourceSchema)
-
- // NOTE: In some cases we need to relax constraint of incoming
dataset's schema to be compatible
- // w/ the table's one and allow schemas to diverge. This
is required in cases where
- // partial updates will be performed (for ex, `MERGE INTO`
Spark SQL statement) and as such
- // only incoming dataset's projection has to match the
table's schema, and not the whole one
- if (!shouldValidateSchemasCompatibility || isCompatible) {
- reconciledSchema
- } else {
- log.error(
- s"""Failed to reconcile incoming batch schema with the
table's one.
- |Incoming schema ${sourceSchema.toString(true)}
- |Incoming schema (canonicalized)
${canonicalizedSourceSchema.toString(true)}
- |Table's schema ${latestTableSchema.toString(true)}
- |""".stripMargin)
- throw new SchemaCompatibilityException("Failed to reconcile
incoming schema with the table's one")
- }
- }
+ deduceWriterSchemaWithReconcile(sourceSchema,
canonicalizedSourceSchema, latestTableSchema, internalSchemaOpt, opts)
+ } else {
+ deduceWriterSchemaWithoutReconcile(sourceSchema,
canonicalizedSourceSchema, latestTableSchema, opts)
+ }
+ }
+ }
+
+ /**
+ * Deducing with disabled reconciliation.
+ * We have to validate that the source's schema is compatible w/ the table's
latest schema,
+ * such that we're able to read existing table's records using
[[sourceSchema]].
+ */
+ private def deduceWriterSchemaWithoutReconcile(sourceSchema: Schema,
+ canonicalizedSourceSchema:
Schema,
+ latestTableSchema: Schema,
+ opts: Map[String, String]):
Schema = {
+ // NOTE: In some cases we need to relax constraint of incoming dataset's
schema to be compatible
+ // w/ the table's one and allow schemas to diverge. This is required
in cases where
+ // partial updates will be performed (for ex, `MERGE INTO` Spark SQL
statement) and as such
+ // only incoming dataset's projection has to match the table's
schema, and not the whole one
+ val mergeIntoWrites = opts.getOrDefault(SQL_MERGE_INTO_WRITES.key(),
SQL_MERGE_INTO_WRITES.defaultValue.toString).toBoolean
+ val shouldValidateSchemasCompatibility =
opts.getOrDefault(HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key,
+ HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.defaultValue).toBoolean
+ val allowAutoEvolutionColumnDrop =
opts.getOrDefault(HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key,
+
HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.defaultValue).toBoolean
+ val setNullForMissingColumns =
opts.getOrDefault(DataSourceWriteOptions.SET_NULL_FOR_MISSING_COLUMNS.key(),
+
DataSourceWriteOptions.SET_NULL_FOR_MISSING_COLUMNS.defaultValue).toBoolean
+
+ if (!mergeIntoWrites && !shouldValidateSchemasCompatibility &&
!allowAutoEvolutionColumnDrop) {
+ // Default behaviour
+ val reconciledSchema = if (setNullForMissingColumns) {
+ AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema,
latestTableSchema)
+ } else {
+ canonicalizedSourceSchema
+ }
+ checkValidEvolution(reconciledSchema, latestTableSchema)
+ reconciledSchema
+ } else {
+ // If it's merge into writes, we don't check for projection nor schema
compatibility. Writers down the line will take care of it.
+ // Or it's not merge into writes, and we don't validate schema, but we
allow to drop columns automatically.
+ // Or it's not merge into writes, we validate schema, and schema is
compatible.
+ if (shouldValidateSchemasCompatibility) {
+ checkSchemaCompatible(latestTableSchema, canonicalizedSourceSchema,
true,
+ allowAutoEvolutionColumnDrop, java.util.Collections.emptySet())
+ }
+ canonicalizedSourceSchema
+ }
+ }
+
+ /**
+ * Deducing with enabled reconciliation.
+ * Marked as Deprecated.
+ */
+ private def deduceWriterSchemaWithReconcile(sourceSchema: Schema,
+ canonicalizedSourceSchema:
Schema,
+ latestTableSchema: Schema,
+ internalSchemaOpt:
Option[InternalSchema],
+ opts: Map[String, String]):
Schema = {
+ internalSchemaOpt match {
+ case Some(internalSchema) =>
+ // Apply schema evolution, by auto-merging write schema and read schema
+ val mergedInternalSchema =
AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema,
internalSchema)
+ val evolvedSchema =
AvroInternalSchemaConverter.convert(mergedInternalSchema,
latestTableSchema.getFullName)
+ val shouldRemoveMetaDataFromInternalSchema =
sourceSchema.getFields().filter(f =>
f.name().equalsIgnoreCase(HoodieRecord.RECORD_KEY_METADATA_FIELD)).isEmpty
+ if (shouldRemoveMetaDataFromInternalSchema)
HoodieAvroUtils.removeMetadataFields(evolvedSchema) else evolvedSchema
+ case None =>
+ // In case schema reconciliation is enabled we will employ (legacy)
reconciliation
+ // strategy to produce target writer's schema (see definition below)
+ val (reconciledSchema, isCompatible) =
+ reconcileSchemasLegacy(latestTableSchema, canonicalizedSourceSchema)
+
+ // NOTE: In some cases we need to relax constraint of incoming
dataset's schema to be compatible
+ // w/ the table's one and allow schemas to diverge. This is
required in cases where
+ // partial updates will be performed (for ex, `MERGE INTO` Spark
SQL statement) and as such
+ // only incoming dataset's projection has to match the table's
schema, and not the whole one
+ val shouldValidateSchemasCompatibility =
opts.getOrDefault(HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key,
HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.defaultValue).toBoolean
+ if (!shouldValidateSchemasCompatibility || isCompatible) {
+ reconciledSchema
} else {
- // In case reconciliation is disabled, we have to validate that the
source's schema
- // is compatible w/ the table's latest schema, such that we're able
to read existing table's
- // records using [[sourceSchema]].
- //
- // NOTE: In some cases we need to relax constraint of incoming
dataset's schema to be compatible
- // w/ the table's one and allow schemas to diverge. This is
required in cases where
- // partial updates will be performed (for ex, `MERGE INTO`
Spark SQL statement) and as such
- // only incoming dataset's projection has to match the table's
schema, and not the whole one
-
- if (mergeIntoWrites) {
- // if its merge into writes, do not check for projection nor
schema compatibility. Writers down the line will
- // take care of it.
- canonicalizedSourceSchema
- } else {
- if (!shouldValidateSchemasCompatibility) {
- // if no validation is enabled, check for col drop
- if (allowAutoEvolutionColumnDrop) {
- canonicalizedSourceSchema
- } else {
- val reconciledSchema = if (setNullForMissingColumns) {
-
AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema,
latestTableSchema)
- } else {
- canonicalizedSourceSchema
- }
- checkValidEvolution(reconciledSchema, latestTableSchema)
- reconciledSchema
- }
- } else {
- checkSchemaCompatible(latestTableSchema,
canonicalizedSourceSchema, true,
- allowAutoEvolutionColumnDrop, java.util.Collections.emptySet())
- canonicalizedSourceSchema
- }
- }
+ log.error(
+ s"""Failed to reconcile incoming batch schema with the table's one.
+ |Incoming schema ${sourceSchema.toString(true)}
+ |Incoming schema (canonicalized)
${canonicalizedSourceSchema.toString(true)}
+ |Table's schema ${latestTableSchema.toString(true)}
+ |""".stripMargin)
+ throw new SchemaCompatibilityException("Failed to reconcile incoming
schema with the table's one")
}
}
}