danny0405 commented on code in PR #9876:
URL: https://github.com/apache/hudi/pull/9876#discussion_r1365081774
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -449,21 +466,58 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
new StructType(targetTableSchema), structName, nameSpace)
}
+ /**
+ * @param conditionalAssignments Conditional assignments.
+ * @return Updated fields based on the conditional assignments in the MERGE
INTO statement.
+ */
+ private def getUpdatedFields(conditionalAssignments: Seq[Seq[Assignment]]):
Seq[String] = {
+ val updatedFieldsSeq = {
+ conditionalAssignments.flatMap {
+ case assignments =>
+ // Extract all fields that are updated through the assignments
+ if (assignments.nonEmpty) {
+ assignments.map {
+ case Assignment(attr: Attribute, _) => attr
+ case a =>
+ throw new AnalysisException(s"Only assignments of the form
`t.field = ...` are supported at the moment (provided: `${a.sql}`)")
Review Comment:
Does the throwing cause any functional regression?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -755,6 +756,24 @@ public class HoodieWriteConfig extends HoodieConfig {
.withDocumentation("Whether to write record positions to the block
header for data blocks containing updates and delete blocks. "
+ "The record positions can be used to improve the performance of
merging records from base and log files.");
+ public static final ConfigProperty<Boolean> WRITE_PARTIAL_UPDATES =
ConfigProperty
+ .key("hoodie.write.partial.updates")
+ .defaultValue(false)
+ .markAdvanced()
+ .sinceVersion("1.0.0")
+ .withDocumentation("Whether to write partial updates to the data blocks
containing updates "
+ + "in MOR tables. The data blocks containing partial updates have a
schema with a "
+ + "subset of fields compared to the full schema of the table.
Partial updates are "
+ + "automatically turned on for Spark SQL MERGE INTO statement with
upserts to MOR tables.");
+
+ public static final ConfigProperty<String> WRITE_PARTIAL_UPDATE_SCHEMA =
ConfigProperty
+ .key("hoodie.write.partial.update.schema")
+ .defaultValue("")
+ .markAdvanced()
Review Comment:
Can we merge the two options into one, if
`hoodie.write.partial.update.schema` is non-empty, that means the partial
update is turned on.
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala:
##########
@@ -411,10 +414,14 @@ object ExpressionPayload {
parseSchema(props.getProperty(PAYLOAD_RECORD_AVRO_SCHEMA))
}
- private def getWriterSchema(props: Properties): Schema = {
-
ValidationUtils.checkArgument(props.containsKey(HoodieWriteConfig.WRITE_SCHEMA_OVERRIDE.key),
- s"Missing ${HoodieWriteConfig.WRITE_SCHEMA_OVERRIDE.key} property")
- parseSchema(props.getProperty(HoodieWriteConfig.WRITE_SCHEMA_OVERRIDE.key))
+ private def getWriterSchema(props: Properties, isPartialUpdate: Boolean):
Schema = {
+ if (isPartialUpdate) {
+
parseSchema(props.getProperty(HoodieWriteConfig.WRITE_PARTIAL_UPDATE_SCHEMA.key))
Review Comment:
I was actually expecting we always use the full schema as the write schema,
while for partial update, it takes along another configuration for the partial
schema, which is more straight forward.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java:
##########
@@ -652,6 +660,16 @@ private static Map<HeaderMetadataType, String>
getUpdatedHeader(Map<HeaderMetada
if (addBlockIdentifier &&
!HoodieTableMetadata.isMetadataTable(config.getBasePath())) { // add block
sequence numbers only for data table.
updatedHeader.put(HeaderMetadataType.BLOCK_IDENTIFIER, attemptNumber +
"," + blockSequenceNumber);
}
+ if (config.shouldWritePartialUpdates()) {
+ // When enabling writing partial updates to the data blocks, the full
schema is also written
+ // to the block header so that the reader can differentiate partial
updates vs schema
+ // evolution, based on the "SCHEMA" which contains the partial schema
and the "FULL_SCHEMA"
+ // which contains the full schema of the table at this time.
+ updatedHeader.put(
+ HeaderMetadataType.FULL_SCHEMA,
+ HoodieAvroUtils.addMetadataFields(
+ getWriteSchema(config),
config.allowOperationMetadataField()).toString());
Review Comment:
Can we fetch the full schema from the table config or write config?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]