danny0405 commented on code in PR #9876:
URL: https://github.com/apache/hudi/pull/9876#discussion_r1365081774


##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -449,21 +466,58 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
       new StructType(targetTableSchema), structName, nameSpace)
   }
 
+  /**
+   * @param conditionalAssignments Conditional assignments.
+   * @return Updated fields based on the conditional assignments in the MERGE 
INTO statement.
+   */
+  private def getUpdatedFields(conditionalAssignments: Seq[Seq[Assignment]]): 
Seq[String] = {
+    val updatedFieldsSeq = {
+      conditionalAssignments.flatMap {
+        case assignments =>
+          // Extract all fields that are updated through the assignments
+          if (assignments.nonEmpty) {
+            assignments.map {
+              case Assignment(attr: Attribute, _) => attr
+              case a =>
+                throw new AnalysisException(s"Only assignments of the form 
`t.field = ...` are supported at the moment (provided: `${a.sql}`)")

Review Comment:
   Does the throwing cause any functional regression?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -755,6 +756,24 @@ public class HoodieWriteConfig extends HoodieConfig {
       .withDocumentation("Whether to write record positions to the block 
header for data blocks containing updates and delete blocks. "
           + "The record positions can be used to improve the performance of 
merging records from base and log files.");
 
+  public static final ConfigProperty<Boolean> WRITE_PARTIAL_UPDATES = 
ConfigProperty
+      .key("hoodie.write.partial.updates")
+      .defaultValue(false)
+      .markAdvanced()
+      .sinceVersion("1.0.0")
+      .withDocumentation("Whether to write partial updates to the data blocks 
containing updates "
+          + "in MOR tables. The data blocks containing partial updates have a 
schema with a "
+          + "subset of fields compared to the full schema of the table. 
Partial updates are "
+          + "automatically turned on for Spark SQL MERGE INTO statement with 
upserts to MOR tables.");
+
+  public static final ConfigProperty<String> WRITE_PARTIAL_UPDATE_SCHEMA = 
ConfigProperty
+      .key("hoodie.write.partial.update.schema")
+      .defaultValue("")
+      .markAdvanced()

Review Comment:
   Can we merge the two options into one, if 
`hoodie.write.partial.update.schema` is non-empty, that means the partial 
update is turned on.



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala:
##########
@@ -411,10 +414,14 @@ object ExpressionPayload {
     parseSchema(props.getProperty(PAYLOAD_RECORD_AVRO_SCHEMA))
   }
 
-  private def getWriterSchema(props: Properties): Schema = {
-    
ValidationUtils.checkArgument(props.containsKey(HoodieWriteConfig.WRITE_SCHEMA_OVERRIDE.key),
-      s"Missing ${HoodieWriteConfig.WRITE_SCHEMA_OVERRIDE.key} property")
-    parseSchema(props.getProperty(HoodieWriteConfig.WRITE_SCHEMA_OVERRIDE.key))
+  private def getWriterSchema(props: Properties, isPartialUpdate: Boolean): 
Schema = {
+    if (isPartialUpdate) {
+      
parseSchema(props.getProperty(HoodieWriteConfig.WRITE_PARTIAL_UPDATE_SCHEMA.key))

Review Comment:
   I was actually expecting we always use the full schema as the write schema, 
while for partial update, it takes along another configuration for the partial 
schema, which is more straight forward.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java:
##########
@@ -652,6 +660,16 @@ private static Map<HeaderMetadataType, String> 
getUpdatedHeader(Map<HeaderMetada
     if (addBlockIdentifier && 
!HoodieTableMetadata.isMetadataTable(config.getBasePath())) { // add block 
sequence numbers only for data table.
       updatedHeader.put(HeaderMetadataType.BLOCK_IDENTIFIER, attemptNumber + 
"," + blockSequenceNumber);
     }
+    if (config.shouldWritePartialUpdates()) {
+      // When enabling writing partial updates to the data blocks, the full 
schema is also written
+      // to the block header so that the reader can differentiate partial 
updates vs schema
+      // evolution, based on the "SCHEMA" which contains the partial schema 
and the "FULL_SCHEMA"
+      // which contains the full schema of the table at this time.
+      updatedHeader.put(
+          HeaderMetadataType.FULL_SCHEMA,
+          HoodieAvroUtils.addMetadataFields(
+              getWriteSchema(config), 
config.allowOperationMetadataField()).toString());

Review Comment:
   Can we fetch the full schema from the table config or write config?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to