alexeykudinkin commented on code in PR #6358:
URL: https://github.com/apache/hudi/pull/6358#discussion_r1024701207
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -81,20 +79,7 @@
public static IgnoreRecord IGNORE_RECORD = new IgnoreRecord();
/**
- * The specified schema of the table. ("specified" denotes that this is
configured by the client,
- * as opposed to being implicitly fetched out of the commit metadata)
- */
- protected final Schema tableSchema;
- protected final Schema tableSchemaWithMetaFields;
Review Comment:
> and I also triaged the usages of WriterSchema(master) compared to
tableSchema. its used only in bootstrap cases.
It was also inverted in case of MERGE INTO (actually this whole dichotomy
was introduced to support it)
> I would expect all writer handles (create, append, etc), to use the
writerSchema. but I see the its the table Schema is used in these handles (as
per master).
Do we know why? after this patch, is what making sense to me. but wondeirng
if we are missing any case here.
That's exactly the reason it is being addressed in this PR -- it was
inconsistent, confusing and in some cases incorrect.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -172,23 +182,63 @@ object HoodieSparkSqlWriter {
}
val commitActionType = CommitUtils.getCommitActionType(operation,
tableConfig.getTableType)
- val dropPartitionColumns =
hoodieConfig.getBoolean(DataSourceWriteOptions.DROP_PARTITION_COLUMNS)
- // short-circuit if bulk_insert via row is enabled.
+ // Register Avro classes ([[Schema]], [[GenericData]]) w/ Kryo
+ sparkContext.getConf.registerKryoClasses(
Review Comment:
This isn't going to affect this actually
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -239,83 +289,65 @@ object HoodieSparkSqlWriter {
client.startCommitWithTime(instantTime, commitActionType)
val writeStatuses =
DataSourceUtils.doDeletePartitionsOperation(client, partitionsToDelete,
instantTime)
(writeStatuses, client)
- }
- case _ => { // any other operation
- // register classes & schemas
- val (structName, nameSpace) =
AvroConversionUtils.getAvroRecordNameAndNamespace(tblName)
- sparkContext.getConf.registerKryoClasses(
- Array(classOf[org.apache.avro.generic.GenericData],
- classOf[org.apache.avro.Schema]))
-
- // TODO(HUDI-4472) revisit and simplify schema handling
- val sourceSchema =
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName,
nameSpace)
- val latestTableSchema = getLatestTableSchema(fs, basePath,
sparkContext).getOrElse(sourceSchema)
-
- val schemaEvolutionEnabled =
parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(),
"false").toBoolean
- var internalSchemaOpt = getLatestTableInternalSchema(fs, basePath,
sparkContext)
-
- val writerSchema: Schema =
- if (reconcileSchema) {
- // In case we need to reconcile the schema and schema
evolution is enabled,
- // we will force-apply schema evolution to the writer's schema
- if (schemaEvolutionEnabled && internalSchemaOpt.isEmpty) {
- internalSchemaOpt =
Some(AvroInternalSchemaConverter.convert(sourceSchema))
- }
- if (internalSchemaOpt.isDefined) {
- // Apply schema evolution, by auto-merging write schema and
read schema
- val mergedInternalSchema =
AvroSchemaEvolutionUtils.reconcileSchema(sourceSchema, internalSchemaOpt.get)
- AvroInternalSchemaConverter.convert(mergedInternalSchema,
latestTableSchema.getName)
- } else if
(TableSchemaResolver.isSchemaCompatible(sourceSchema, latestTableSchema)) {
- // In case schema reconciliation is enabled and source and
latest table schemas
- // are compatible (as defined by
[[TableSchemaResolver#isSchemaCompatible]], then we will
- // pick latest table's schema as the writer's schema
- latestTableSchema
- } else {
- // Otherwise fallback to original source's schema
- sourceSchema
- }
- } else {
- // In case reconciliation is disabled, we still have to do
nullability attributes
- // (minor) reconciliation, making sure schema of the incoming
batch is in-line with
- // the data already committed in the table
-
AvroSchemaEvolutionUtils.canonicalizeColumnNullability(sourceSchema,
latestTableSchema)
- }
+ case _ =>
+ // Convert to RDD[HoodieRecord]
+ val avroRecords: RDD[GenericRecord] =
HoodieSparkUtils.createRdd(df, avroRecordName, avroRecordNamespace,
+ Some(writerSchema))
+
+ // Check whether partition columns should be persisted w/in the
data-files, or should
+ // be instead omitted from them and simply encoded into the
partition path (which is Spark's
+ // behavior by default)
+ // TODO move partition columns handling down into the handlers
+ val shouldDropPartitionColumns =
hoodieConfig.getBoolean(DataSourceWriteOptions.DROP_PARTITION_COLUMNS)
+ val dataFileSchema = if (shouldDropPartitionColumns) {
+ val truncatedSchema =
generateSchemaWithoutPartitionColumns(partitionColumns, writerSchema)
+ // NOTE: We have to register this schema w/ Kryo to make sure
it's able to apply an optimization
+ // allowing it to avoid the need to ser/de the whole
schema along _every_ Avro record
+ registerAvroSchemasWithKryo(sparkContext, truncatedSchema)
+ truncatedSchema
+ } else {
+ writerSchema
+ }
- validateSchemaForHoodieIsDeleted(writerSchema)
- sparkContext.getConf.registerAvroSchemas(writerSchema)
- log.info(s"Registered avro schema :
${writerSchema.toString(true)}")
+ // NOTE: Avro's [[Schema]] can't be effectively serialized by JVM
native serialization framework
+ // (due to containing cyclic refs), therefore we have to
convert it to string before
+ // passing onto the Executor
+ val dataFileSchemaStr = dataFileSchema.toString
- // Convert to RDD[HoodieRecord]
- val genericRecords: RDD[GenericRecord] =
HoodieSparkUtils.createRdd(df, structName, nameSpace, reconcileSchema,
- org.apache.hudi.common.util.Option.of(writerSchema))
val shouldCombine = parameters(INSERT_DROP_DUPS.key()).toBoolean ||
operation.equals(WriteOperationType.UPSERT) ||
parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(),
HoodieWriteConfig.COMBINE_BEFORE_INSERT.defaultValue()).toBoolean
- val hoodieAllIncomingRecords = genericRecords.map(gr => {
- val processedRecord = getProcessedRecord(partitionColumns, gr,
dropPartitionColumns)
- val hoodieRecord = if (shouldCombine) {
- val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr,
hoodieConfig.getString(PRECOMBINE_FIELD), false, parameters.getOrElse(
-
DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
-
DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()).toBoolean)
- .asInstanceOf[Comparable[_]]
- DataSourceUtils.createHoodieRecord(processedRecord,
- orderingVal,
- keyGenerator.getKey(gr),
- hoodieConfig.getString(PAYLOAD_CLASS_NAME))
- } else {
- DataSourceUtils.createHoodieRecord(processedRecord,
keyGenerator.getKey(gr), hoodieConfig.getString(PAYLOAD_CLASS_NAME))
+
+ val hoodieRecords = avroRecords.mapPartitions(it => {
Review Comment:
This is not changing the DAG: there's no diff b/w `map` and `mapPartitions`
other than the scopes of corresponding lambdas used for them (`mapPartitions`
allows to have some executions performed just once per partition, such as Avro
schema parsing for ex)
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -567,36 +682,25 @@ object HoodieSparkSqlWriter {
}
def bulkInsertAsRow(sqlContext: SQLContext,
- parameters: Map[String, String],
+ hoodieConfig: HoodieConfig,
df: DataFrame,
tblName: String,
basePath: Path,
path: String,
instantTime: String,
- partitionColumns: String): (Boolean,
common.util.Option[String]) = {
- val sparkContext = sqlContext.sparkContext
- val populateMetaFields =
java.lang.Boolean.parseBoolean(parameters.getOrElse(HoodieTableConfig.POPULATE_META_FIELDS.key(),
- String.valueOf(HoodieTableConfig.POPULATE_META_FIELDS.defaultValue())))
- val dropPartitionColumns =
parameters.get(DataSourceWriteOptions.DROP_PARTITION_COLUMNS.key()).map(_.toBoolean)
- .getOrElse(DataSourceWriteOptions.DROP_PARTITION_COLUMNS.defaultValue())
- // register classes & schemas
- val (structName, nameSpace) =
AvroConversionUtils.getAvroRecordNameAndNamespace(tblName)
- sparkContext.getConf.registerKryoClasses(
- Array(classOf[org.apache.avro.generic.GenericData],
- classOf[org.apache.avro.Schema]))
- var schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema,
structName, nameSpace)
- if (dropPartitionColumns) {
- schema = generateSchemaWithoutPartitionColumns(partitionColumns, schema)
- }
- validateSchemaForHoodieIsDeleted(schema)
- sparkContext.getConf.registerAvroSchemas(schema)
- log.info(s"Registered avro schema : ${schema.toString(true)}")
- if (parameters(INSERT_DROP_DUPS.key).toBoolean) {
+ writerSchema: Schema): (Boolean,
common.util.Option[String]) = {
+ if (hoodieConfig.getBoolean(INSERT_DROP_DUPS)) {
throw new HoodieException("Dropping duplicates with bulk_insert in row
writer path is not supported yet")
}
- val params: mutable.Map[String, String] =
collection.mutable.Map(parameters.toSeq: _*)
- params(HoodieWriteConfig.AVRO_SCHEMA_STRING.key) = schema.toString
- val writeConfig = DataSourceUtils.createHoodieConfig(schema.toString,
path, tblName, mapAsJavaMap(params))
+
+ val writerSchemaStr = writerSchema.toString
+
+ val opts = hoodieConfig.getProps.toMap ++
+ Map(HoodieWriteConfig.AVRO_SCHEMA_STRING.key -> writerSchemaStr)
+
+ val writeConfig = DataSourceUtils.createHoodieConfig(writerSchemaStr,
path, tblName, mapAsJavaMap(opts))
Review Comment:
We need `HoodieWriteConfig` (we actually include whole `hoodieConfig` into
it, L698)
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -347,6 +378,95 @@ object HoodieSparkSqlWriter {
}
}
+ /**
+ * Deduces writer's schema based on
+ * <ul>
+ * <li>Source's schema</li>
+ * <li>Target table's schema (including Hudi's [[InternalSchema]]
representation)</li>
+ * </ul>
+ */
+ def deduceWriterSchema(sourceSchema: Schema,
+ latestTableSchemaOpt: Option[Schema],
+ internalSchemaOpt: Option[InternalSchema],
+ opts: Map[String, String]): Schema = {
+ val shouldReconcileSchema =
opts(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean
+ val shouldValidateSchemasCompatibility =
opts.getOrDefault(HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key,
+ HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.defaultValue).toBoolean
+
+ latestTableSchemaOpt match {
+ // In case table schema is empty we're just going to use the source
schema as a
+ // writer's schema. No additional handling is required
+ case None => sourceSchema
+ // Otherwise, we need to make sure we reconcile incoming and latest
table schemas
+ case Some(latestTableSchema) =>
+ // Before validating whether schemas are compatible, we need to
"canonicalize" source's schema
+ // relative to the table's one, by doing a (minor) reconciliation of
the nullability constraints:
+ // for ex, if in incoming schema column A is designated as non-null,
but it's designated as nullable
+ // in the table's one we want to proceed aligning nullability
constraints w/ the table's schema
+ val shouldCanonicalizeSchema =
opts.getOrDefault(DataSourceWriteOptions.CANONICALIZE_SCHEMA.key,
+
DataSourceWriteOptions.CANONICALIZE_SCHEMA.defaultValue.toString).toBoolean
+ val canonicalizedSourceSchema = if (shouldCanonicalizeSchema) {
+ AvroSchemaEvolutionUtils.canonicalizeColumnNullability(sourceSchema,
latestTableSchema)
+ } else {
+ sourceSchema
+ }
+
+ if (shouldReconcileSchema) {
+ internalSchemaOpt match {
+ case Some(internalSchema) =>
+ // Apply schema evolution, by auto-merging write schema and read
schema
+ val mergedInternalSchema =
AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema,
internalSchema)
+ AvroInternalSchemaConverter.convert(mergedInternalSchema,
latestTableSchema.getFullName)
+
+ case None =>
+ // In case schema reconciliation is enabled and source and
latest table schemas
+ // are compatible (as defined by
[[TableSchemaResolver#isSchemaCompatible]]), then we
+ // will rebase incoming batch onto the table's latest schema
(ie, reconcile them)
+ //
+ // NOTE: Since we'll be converting incoming batch from
[[sourceSchema]] into [[latestTableSchema]]
+ // we're validating in that order (where [[sourceSchema]]
is treated as a reader's schema,
+ // and [[latestTableSchema]] is treated as a writer's
schema)
+ //
+ // NOTE: In some cases we need to relax constraint of incoming
dataset's schema to be compatible
+ // w/ the table's one and allow schemas to diverge. This
is required in cases where
+ // partial updates will be performed (for ex, `MERGE INTO`
Spark SQL statement) and as such
+ // only incoming dataset's projection has to match the
table's schema, and not the whole one
+ if (!shouldValidateSchemasCompatibility ||
TableSchemaResolver.isSchemaCompatible(canonicalizedSourceSchema,
latestTableSchema)) {
Review Comment:
This primarily depends on whether reconciliation is enabled or not:
- if **enabled**: we will rewrite incoming batch into table's schema (if we
can; ie if target has 4 cols, incoming has 3, we will add missing one filled w/
nulls)
- if **disabled**: we will rewrite table* into incoming batch's schema (ie
the other way around; note that we'd only rewrite the files that will have
records updated in them)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]