the-other-tim-brown commented on code in PR #14374:
URL: https://github.com/apache/hudi/pull/14374#discussion_r2578352901
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala:
##########
@@ -108,20 +108,20 @@ object HoodieSchemaUtils {
* <li>Target table's schema (including Hudi's [[InternalSchema]]
representation)</li>
* </ul>
*/
- def deduceWriterSchema(sourceSchema: Schema,
- latestTableSchemaOpt: Option[Schema],
+ def deduceWriterSchema(sourceSchema: HoodieSchema,
+ latestTableSchemaOpt: Option[HoodieSchema],
internalSchemaOpt: Option[InternalSchema],
- opts: Map[String, String]): Schema = {
+ opts: Map[String, String]): HoodieSchema = {
latestTableSchemaOpt match {
// If table schema is empty, then we use the source schema as a writer's
schema.
- case None =>
InternalSchemaConverter.fixNullOrdering(HoodieSchema.fromAvroSchema(sourceSchema)).toAvroSchema
+ case None => InternalSchemaConverter.fixNullOrdering(sourceSchema)
// Otherwise, we need to make sure we reconcile incoming and latest
table schemas
case Some(latestTableSchemaWithMetaFields) =>
// NOTE: Meta-fields will be unconditionally injected by Hudi writing
handles, for the sake of deducing proper writer schema
// we're stripping them to make sure we can perform proper
analysis
// add call to fix null ordering to ensure backwards compatibility
val latestTableSchema =
InternalSchemaConverter.fixNullOrdering(HoodieSchema.fromAvroSchema(
- removeMetadataFields(latestTableSchemaWithMetaFields))).toAvroSchema
+
removeMetadataFields(latestTableSchemaWithMetaFields.getAvroSchema))).toAvroSchema
Review Comment:
```suggestion
val latestTableSchema = InternalSchemaConverter.fixNullOrdering(
org.apache.hudi.common.schema.HoodieSchemaUtils.removeMetadataFields(latestTableSchemaWithMetaFields))
```
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala:
##########
@@ -169,18 +169,18 @@ object HoodieSchemaUtils {
if (!mergeIntoWrites && !shouldValidateSchemasCompatibility &&
!allowAutoEvolutionColumnDrop) {
// Default behaviour
val reconciledSchema = if (setNullForMissingColumns) {
- AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema,
latestTableSchema, setNullForMissingColumns)
+
AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema.toAvroSchema(),
latestTableSchema.toAvroSchema(), setNullForMissingColumns)
} else {
- canonicalizedSourceSchema
+ canonicalizedSourceSchema.toAvroSchema()
}
- checkValidEvolution(reconciledSchema, latestTableSchema)
- reconciledSchema
+ checkValidEvolution(reconciledSchema, latestTableSchema.toAvroSchema())
Review Comment:
Use `HoodieSchemaCompatibility.checkValidEvolution` here
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -146,7 +146,7 @@ object HoodieSparkSqlWriter {
Metrics.shutdownAllMetrics()
}
- def getBulkInsertRowConfig(writerSchema:
org.apache.hudi.common.util.Option[Schema], hoodieConfig: HoodieConfig,
+ def getBulkInsertRowConfig(writerSchema:
org.apache.hudi.common.util.Option[HoodieSchema ], hoodieConfig: HoodieConfig,
Review Comment:
nit: trailing space after `HoodieSchema`. Please fix this throughout the file
##########
hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/TestHoodieSchemaUtils.java:
##########
@@ -325,8 +326,19 @@ private static Schema deduceWriterSchema(Schema
incomingSchema, Schema latestTab
private static Schema deduceWriterSchema(Schema incomingSchema, Schema
latestTableSchema, Boolean addNull) {
TYPED_PROPERTIES.setProperty(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS.key(),
addNull.toString());
- return HoodieSchemaUtils.deduceWriterSchema(incomingSchema,
Option.ofNullable(latestTableSchema),
- Option.empty(), TYPED_PROPERTIES);
+
+ // Convert latestTableSchema to Option<HoodieSchema>
+ Option<HoodieSchema> latestTableSchemaOpt = latestTableSchema != null
+ ? Option.of(HoodieSchema.fromAvroSchema(latestTableSchema))
+ : Option.empty();
Review Comment:
```suggestion
Option<HoodieSchema> latestTableSchemaOpt =
Option.ofNullable(latestTableSchema).map(HoodieSchema::fromAvroSchema);
```
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala:
##########
@@ -230,16 +232,27 @@ object HoodieSchemaUtils {
}
}
- def deduceWriterSchema(sourceSchema: Schema,
- latestTableSchemaOpt:
org.apache.hudi.common.util.Option[Schema],
+ def deduceWriterSchema(sourceSchema: HoodieSchema,
+ latestTableSchemaOpt:
org.apache.hudi.common.util.Option[HoodieSchema],
internalSchemaOpt:
org.apache.hudi.common.util.Option[InternalSchema],
- props: TypedProperties): Schema = {
+ props: TypedProperties): HoodieSchema = {
deduceWriterSchema(sourceSchema,
HoodieConversionUtils.toScalaOption(latestTableSchemaOpt),
HoodieConversionUtils.toScalaOption(internalSchemaOpt),
HoodieConversionUtils.fromProperties(props))
}
+ /**
+ * Converts an Option of Avro Schema to an Option of HoodieSchema.
+ * This is a convenience method for Java callers working with Hudi's Option
type.
+ *
+ * @param avroSchemaOpt Optional Avro Schema (Hudi's Option type)
+ * @return Optional HoodieSchema (Hudi's Option type)
+ */
+ def toHoodieSchemaOption(avroSchemaOpt:
org.apache.hudi.common.util.Option[Schema]):
org.apache.hudi.common.util.Option[HoodieSchema] = {
Review Comment:
This looks like it is only used once, can it be inlined instead?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala:
##########
@@ -191,33 +191,35 @@ object HoodieSchemaUtils {
* Deducing with enabled reconciliation.
* Marked as Deprecated.
*/
- private def deduceWriterSchemaWithReconcile(sourceSchema: Schema,
- canonicalizedSourceSchema:
Schema,
+ private def deduceWriterSchemaWithReconcile(sourceSchema: HoodieSchema,
+ canonicalizedSourceSchema:
HoodieSchema,
latestTableSchema: Schema,
Review Comment:
Can we change latestTableSchema to be a HoodieSchema now?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala:
##########
@@ -169,18 +169,18 @@ object HoodieSchemaUtils {
if (!mergeIntoWrites && !shouldValidateSchemasCompatibility &&
!allowAutoEvolutionColumnDrop) {
// Default behaviour
val reconciledSchema = if (setNullForMissingColumns) {
- AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema,
latestTableSchema, setNullForMissingColumns)
+
AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema.toAvroSchema(),
latestTableSchema.toAvroSchema(), setNullForMissingColumns)
} else {
- canonicalizedSourceSchema
+ canonicalizedSourceSchema.toAvroSchema()
}
- checkValidEvolution(reconciledSchema, latestTableSchema)
- reconciledSchema
+ checkValidEvolution(reconciledSchema, latestTableSchema.toAvroSchema())
+ HoodieSchema.fromAvroSchema(reconciledSchema)
} else {
// If it's merge into writes, we don't check for projection nor schema
compatibility. Writers down the line will take care of it.
// Or it's not merge into writes, and we don't validate schema, but we
allow to drop columns automatically.
// Or it's not merge into writes, we validate schema, and schema is
compatible.
if (shouldValidateSchemasCompatibility) {
- checkSchemaCompatible(latestTableSchema, canonicalizedSourceSchema,
true,
+ checkSchemaCompatible(latestTableSchema.toAvroSchema(),
canonicalizedSourceSchema.toAvroSchema(), true,
Review Comment:
Similarly use `HoodieSchemaCompatibility.checkSchemaCompatible`
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -365,16 +365,25 @@ class HoodieSparkSqlWriterInternal {
// NOTE: We need to make sure that upon conversion of the schemas b/w
Catalyst's [[StructType]] and
// Avro's [[Schema]] we're preserving corresponding "record-name"
and "record-namespace" that
// play crucial role in establishing compatibility b/w schemas
- val (avroRecordName, avroRecordNamespace) = latestTableSchemaOpt.map(s
=> (s.getName, s.getNamespace))
+ val (avroRecordName, avroRecordNamespace) = latestTableSchemaOpt.map(s =>
+ (s.getName, toScalaOption(s.getNamespace).orNull))
.getOrElse(getAvroRecordNameAndNamespace(tblName))
- val sourceSchema = convertStructTypeToAvroSchema(df.schema,
avroRecordName, avroRecordNamespace)
+ val sourceSchema = HoodieSchema.fromAvroSchema(
+ convertStructTypeToAvroSchema(df.schema, avroRecordName,
avroRecordNamespace)
+ )
val internalSchemaOpt =
HoodieSchemaUtils.getLatestTableInternalSchema(hoodieConfig,
tableMetaClient).orElse {
// In case we need to reconcile the schema and schema evolution is
enabled,
// we will force-apply schema evolution to the writer's schema
if (shouldReconcileSchema &&
hoodieConfig.getBooleanOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED))
{
val allowOperationMetaDataField =
parameters.getOrElse(HoodieWriteConfig.ALLOW_OPERATION_METADATA_FIELD.key(),
"false").toBoolean
-
Some(InternalSchemaConverter.convert(HoodieSchema.fromAvroSchema(HoodieAvroUtils.addMetadataFields(latestTableSchemaOpt.getOrElse(sourceSchema),
allowOperationMetaDataField))))
+ //TODO need to rename the spark schema class to something else to
not have this be so confusing
Review Comment:
Is this in reference to the Utils class?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala:
##########
@@ -270,14 +285,14 @@ object HoodieSchemaUtils {
// NOTE: By default Hudi doesn't allow automatic schema evolution to drop
the columns from the target
// table. However, when schema reconciliation is turned on, we would
allow columns to be dropped
// in the incoming batch (as these would be reconciled in anyway)
- if (isCompatibleProjectionOf(tableSchema, newSchema)) {
+ if (isCompatibleProjectionOf(tableSchema.toAvroSchema(),
newSchema.toAvroSchema())) {
Review Comment:
Use `HoodieSchemaCompatibility` here as well
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -479,7 +493,7 @@ class HoodieSparkSqlWriterInternal {
// Remove meta columns from writerSchema if isPrepped is true.
val processedDataSchema = if (preppedSparkSqlWrites ||
preppedSparkSqlMergeInto || preppedWriteOperation) {
- HoodieAvroUtils.removeMetadataFields(dataFileSchema)
+
HoodieSchema.fromAvroSchema(HoodieAvroUtils.removeMetadataFields(dataFileSchema.toAvroSchema()))
Review Comment:
Can you use the HoodieSchemaUtils directly here?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -640,12 +654,14 @@ class HoodieSparkSqlWriterInternal {
.toSeq
}
- def generateSchemaWithoutPartitionColumns(partitionParam: String, schema:
Schema): Schema = {
+ def generateSchemaWithoutPartitionColumns(partitionParam: String, schema:
HoodieSchema): HoodieSchema = {
val partitionColumns = getPartitionColumns(partitionParam)
- HoodieAvroUtils.removeFields(schema, partitionColumns.toSet.asJava)
+ HoodieSchema.fromAvroSchema(
+ HoodieAvroUtils.removeFields(schema.toAvroSchema(),
partitionColumns.toSet.asJava)
Review Comment:
Let's add a method for removing fields instead of converting to/from avro
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]