This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ad4560f2a4 [HUDI-6800] Support writing partial updates to the data 
blocks in MOR tables (#9876)
0ad4560f2a4 is described below

commit 0ad4560f2a4de00e43814b0d6cef2886a8a38155
Author: Y Ethan Guo <[email protected]>
AuthorDate: Wed Oct 25 15:24:26 2023 -0700

    [HUDI-6800] Support writing partial updates to the data blocks in MOR 
tables (#9876)
    
    This commit adds the functionality to write partial updates to the data 
blocks in MOR tables, for Spark SQL MERGE INTO.
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  18 ++-
 .../org/apache/hudi/io/HoodieAppendHandle.java     |  18 ++-
 .../java/org/apache/hudi/io/HoodieWriteHandle.java |   2 +-
 .../common/table/log/block/HoodieLogBlock.java     |   2 +-
 .../org/apache/hudi/common/util/ConfigUtils.java   |  20 +--
 .../scala/org/apache/hudi/DataSourceOptions.scala  |   9 ++
 .../hudi/command/MergeIntoHoodieTableCommand.scala | 147 +++++++++++++++------
 .../hudi/command/payload/ExpressionPayload.scala   |  20 ++-
 .../apache/spark/sql/hudi/TestMergeIntoTable.scala |  12 +-
 .../spark/sql/hudi/TestMergeIntoTable2.scala       |   6 +
 .../sql/hudi/TestPartialUpdateForMergeInto.scala   |  83 ++++++++++--
 11 files changed, 268 insertions(+), 69 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 8c08beaaef9..cc3876338cc 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -33,6 +33,7 @@ import org.apache.hudi.common.config.HoodieMetaserverConfig;
 import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.config.HoodieTableServiceManagerConfig;
+import org.apache.hudi.common.config.HoodieTimeGeneratorConfig;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.fs.ConsistencyGuardConfig;
@@ -50,7 +51,6 @@ import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.table.marker.MarkerType;
-import org.apache.hudi.common.config.HoodieTimeGeneratorConfig;
 import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.util.ConfigUtils;
@@ -756,6 +756,14 @@ public class HoodieWriteConfig extends HoodieConfig {
       .withDocumentation("Whether to write record positions to the block 
header for data blocks containing updates and delete blocks. "
           + "The record positions can be used to improve the performance of 
merging records from base and log files.");
 
+  public static final ConfigProperty<String> WRITE_PARTIAL_UPDATE_SCHEMA = 
ConfigProperty
+      .key("hoodie.write.partial.update.schema")
+      .defaultValue("")
+      .markAdvanced()
+      .sinceVersion("1.0.0")
+      .withDocumentation("Avro schema of the partial updates. This is 
automatically set by the "
+          + "Hudi write client and user is not expected to manually change the 
value.");
+
   /**
    * Config key with boolean value that indicates whether record being written 
during MERGE INTO Spark SQL
    * operation are already prepped.
@@ -2072,6 +2080,14 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getBoolean(WRITE_RECORD_POSITIONS);
   }
 
+  public boolean shouldWritePartialUpdates() {
+    return !StringUtils.isNullOrEmpty(getString(WRITE_PARTIAL_UPDATE_SCHEMA));
+  }
+
+  public String getPartialUpdateSchema() {
+    return getString(WRITE_PARTIAL_UPDATE_SCHEMA);
+  }
+
   public double getParquetCompressionRatio() {
     return getDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION);
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index 4075541a750..cc1932ce27f 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -149,7 +149,14 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
 
   public HoodieAppendHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T, I, K, O> hoodieTable,
                             String partitionPath, String fileId, 
Iterator<HoodieRecord<T>> recordItr, TaskContextSupplier taskContextSupplier) {
-    super(config, instantTime, partitionPath, fileId, hoodieTable, 
taskContextSupplier);
+    super(config, instantTime, partitionPath, fileId, hoodieTable,
+        config.shouldWritePartialUpdates()
+            // When enabling writing partial updates to the data blocks in log 
files,
+            // i.e., partial update schema is set, the writer schema is the 
partial
+            // schema containing the updated fields only
+            ? Option.of(new 
Schema.Parser().parse(config.getPartialUpdateSchema()))
+            : Option.empty(),
+        taskContextSupplier);
     this.fileId = fileId;
     this.recordItr = recordItr;
     this.sizeEstimator = new DefaultSizeEstimator();
@@ -465,7 +472,7 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
 
         blocks.add(getBlock(config, pickLogDataBlockFormat(), recordList, 
shouldWriteRecordPositions,
             getUpdatedHeader(header, blockSequenceNumber++, attemptNumber, 
config,
-            addBlockIdentifier()), keyField));
+                addBlockIdentifier()), keyField));
       }
 
       if (appendDeleteBlocks && recordsToDeleteWithPositions.size() > 0) {
@@ -652,6 +659,13 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
     if (addBlockIdentifier && 
!HoodieTableMetadata.isMetadataTable(config.getBasePath())) { // add block 
sequence numbers only for data table.
       updatedHeader.put(HeaderMetadataType.BLOCK_IDENTIFIER, attemptNumber + 
"," + blockSequenceNumber);
     }
+    if (config.shouldWritePartialUpdates()) {
+      // When enabling writing partial updates to the data blocks, the 
"IS_PARTIAL" flag is also
+      // written to the block header so that the reader can differentiate 
partial updates, i.e.,
+      // the "SCHEMA" header contains the partial schema.
+      updatedHeader.put(
+          HeaderMetadataType.IS_PARTIAL, Boolean.toString(true));
+    }
     return updatedHeader;
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
index e1d043a5a29..8c76e322b09 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
@@ -30,8 +30,8 @@ import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.IOType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.log.LogFileCreationCallback;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.LogFileCreationCallback;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
index 443bcab9b8e..31d7d0d006b 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
@@ -186,7 +186,7 @@ public abstract class HoodieLogBlock {
    * new enums at the end.
    */
   public enum HeaderMetadataType {
-    INSTANT_TIME, TARGET_INSTANT_TIME, SCHEMA, COMMAND_BLOCK_TYPE, 
COMPACTED_BLOCK_TIMES, RECORD_POSITIONS, BLOCK_IDENTIFIER
+    INSTANT_TIME, TARGET_INSTANT_TIME, SCHEMA, COMMAND_BLOCK_TYPE, 
COMPACTED_BLOCK_TIMES, RECORD_POSITIONS, BLOCK_IDENTIFIER, IS_PARTIAL
   }
 
   /**
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
index 9e0655d6734..1fcde1b301b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
@@ -270,11 +270,11 @@ public class ConfigUtils {
    * Gets the raw value for a {@link ConfigProperty} config from properties. 
The key and
    * alternative keys are used to fetch the config.
    *
-   * @param props          Configs in {@link TypedProperties}.
+   * @param props          Configs in {@link Properties}.
    * @param configProperty {@link ConfigProperty} config to fetch.
    * @return {@link Option} of value if the config exists; empty {@link 
Option} otherwise.
    */
-  public static Option<Object> getRawValueWithAltKeys(TypedProperties props,
+  public static Option<Object> getRawValueWithAltKeys(Properties props,
                                                       ConfigProperty<?> 
configProperty) {
     if (props.containsKey(configProperty.key())) {
       return Option.ofNullable(props.get(configProperty.key()));
@@ -321,11 +321,11 @@ public class ConfigUtils {
    * alternative keys are used to fetch the config. If the config is not 
found, an
    * {@link IllegalArgumentException} is thrown.
    *
-   * @param props          Configs in {@link TypedProperties}.
+   * @param props          Configs in {@link Properties}.
    * @param configProperty {@link ConfigProperty} config of String type to 
fetch.
    * @return String value if the config exists.
    */
-  public static String getStringWithAltKeys(TypedProperties props,
+  public static String getStringWithAltKeys(Properties props,
                                             ConfigProperty<String> 
configProperty) {
     return getStringWithAltKeys(props, configProperty, false);
   }
@@ -337,14 +337,14 @@ public class ConfigUtils {
    * the properties. If not using default value, if the config is not found, an
    * {@link IllegalArgumentException} is thrown.
    *
-   * @param props           Configs in {@link TypedProperties}.
+   * @param props           Configs in {@link Properties}.
    * @param configProperty  {@link ConfigProperty} config of String type to 
fetch.
    * @param useDefaultValue Whether to use default value from {@link 
ConfigProperty}.
    * @return String value if the config exists; otherwise, if the config does 
not exist and
    * {@code useDefaultValue} is true, returns default String value if there is 
default value
    * defined in the {@link ConfigProperty} config and {@code null} otherwise.
    */
-  public static String getStringWithAltKeys(TypedProperties props,
+  public static String getStringWithAltKeys(Properties props,
                                             ConfigProperty<String> 
configProperty,
                                             boolean useDefaultValue) {
     if (useDefaultValue) {
@@ -363,12 +363,12 @@ public class ConfigUtils {
    * alternative keys are used to fetch the config. The default value as the 
input of the method
    * is returned if the config is not found in the properties.
    *
-   * @param props          Configs in {@link TypedProperties}.
+   * @param props          Configs in {@link Properties}.
    * @param configProperty {@link ConfigProperty} config of String type to 
fetch.
    * @param defaultValue   Default value.
    * @return String value if the config exists; default value otherwise.
    */
-  public static String getStringWithAltKeys(TypedProperties props,
+  public static String getStringWithAltKeys(Properties props,
                                             ConfigProperty<?> configProperty,
                                             String defaultValue) {
     Option<Object> rawValue = getRawValueWithAltKeys(props, configProperty);
@@ -429,12 +429,12 @@ public class ConfigUtils {
    * alternative keys are used to fetch the config. The default value of 
{@link ConfigProperty}
    * config, if exists, is returned if the config is not found in the 
properties.
    *
-   * @param props          Configs in {@link TypedProperties}.
+   * @param props          Configs in {@link Properties}.
    * @param configProperty {@link ConfigProperty} config to fetch.
    * @return boolean value if the config exists; default boolean value if the 
config does not exist
    * and there is default value defined in the {@link ConfigProperty} config; 
{@code false} otherwise.
    */
-  public static boolean getBooleanWithAltKeys(TypedProperties props,
+  public static boolean getBooleanWithAltKeys(Properties props,
                                               ConfigProperty<?> 
configProperty) {
     Option<Object> rawValue = getRawValueWithAltKeys(props, configProperty);
     boolean defaultValue = configProperty.hasDefaultValue()
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 0c8ead9618c..e7364316205 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -553,6 +553,15 @@ object DataSourceWriteOptions {
       "look up as well. If you may use INSERT_INTO for mutable dataset, then 
you may have to set this config value to \"upsert\". With upsert, you will " +
       "get both precombine and updates to existing records on storage is also 
honored. If not, you may see duplicates. ")
 
+  val ENABLE_MERGE_INTO_PARTIAL_UPDATES: ConfigProperty[Boolean] = 
ConfigProperty
+    .key("hoodie.spark.sql.merge.into.partial.updates")
+    .defaultValue(false)
+    .markAdvanced()
+    .sinceVersion("1.0.0")
+    .withDocumentation("Whether to write partial updates to the data blocks 
containing updates "
+      + "in MOR tables with Spark SQL MERGE INTO statement. The data blocks 
containing partial "
+      + "updates have a schema with a subset of fields compared to the full 
schema of the table.")
+
   val NONE_INSERT_DUP_POLICY = "none"
   val DROP_INSERT_DUP_POLICY = "drop"
   val FAIL_INSERT_DUP_POLICY = "fail"
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index 253fae68ff1..4cbdf3778bd 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -25,7 +25,7 @@ import org.apache.hudi.avro.HoodieAvroUtils
 import org.apache.hudi.common.model.HoodieAvroRecordMerger
 import org.apache.hudi.common.util.StringUtils
 import org.apache.hudi.config.HoodieWriteConfig
-import org.apache.hudi.config.HoodieWriteConfig.{AVRO_SCHEMA_VALIDATE_ENABLE, 
SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP, TBL_NAME}
+import org.apache.hudi.config.HoodieWriteConfig.{AVRO_SCHEMA_VALIDATE_ENABLE, 
SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP, TBL_NAME, WRITE_PARTIAL_UPDATE_SCHEMA}
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.hive.HiveSyncConfigHolder
 import org.apache.hudi.sync.common.HoodieSyncConfig
@@ -49,6 +49,7 @@ import 
org.apache.spark.sql.hudi.command.payload.ExpressionPayload._
 import org.apache.spark.sql.types.{BooleanType, StructField, StructType}
 
 import java.util.Base64
+import scala.collection.JavaConverters._
 
 /**
  * Hudi's implementation of the {@code MERGE INTO} (MIT) Spark SQL statement.
@@ -402,29 +403,49 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
     // Append the table schema to the parameters. In the case of merge into, 
the schema of projectedJoinedDF
     // may be different from the target table, because the are transform 
logical in the update or
     // insert actions.
+    val fullSchema = getTableSchema
     var writeParams = parameters +
       (OPERATION.key -> operation) +
-      (HoodieWriteConfig.WRITE_SCHEMA_OVERRIDE.key -> getTableSchema.toString) 
+
+      (HoodieWriteConfig.WRITE_SCHEMA_OVERRIDE.key -> fullSchema.toString) +
       (DataSourceWriteOptions.TABLE_TYPE.key -> targetTableType)
 
+    // Only enable writing partial updates to data blocks for upserts to MOR 
tables,
+    // when ENABLE_MERGE_INTO_PARTIAL_UPDATES is set to true
+    val writePartialUpdates = (targetTableType == MOR_TABLE_TYPE_OPT_VAL
+      && operation == UPSERT_OPERATION_OPT_VAL
+      && parameters.getOrElse(
+      ENABLE_MERGE_INTO_PARTIAL_UPDATES.key,
+      ENABLE_MERGE_INTO_PARTIAL_UPDATES.defaultValue.toString).toBoolean)
+
+    if (writePartialUpdates) {
+      val updatedFieldSeq = getUpdatedFields(updatingActions.map(a => 
a.assignments))
+      writeParams ++= Seq(
+        WRITE_PARTIAL_UPDATE_SCHEMA.key ->
+          HoodieAvroUtils.generateProjectionSchema(fullSchema, 
updatedFieldSeq.asJava).toString
+      )
+    }
+
     writeParams ++= Seq(
       // Append (encoded) updating actions
       PAYLOAD_UPDATE_CONDITION_AND_ASSIGNMENTS ->
         // NOTE: For updating clause we allow partial assignments, where only 
some of the fields of the target
         //       table's records are updated (w/ the missing ones keeping 
their existing values)
         serializeConditionalAssignments(updatingActions.map(a => (a.condition, 
a.assignments)),
-          partialAssigmentMode = Some(PartialAssignmentMode.ORIGINAL_VALUE)),
+          partialAssignmentMode = Some(PartialAssignmentMode.ORIGINAL_VALUE),
+          keepUpdatedFieldsOnly = writePartialUpdates),
       // Append (encoded) inserting actions
       PAYLOAD_INSERT_CONDITION_AND_ASSIGNMENTS ->
         serializeConditionalAssignments(insertingActions.map(a => 
(a.condition, a.assignments)),
-          partialAssigmentMode = Some(PartialAssignmentMode.NULL_VALUE),
+          partialAssignmentMode = Some(PartialAssignmentMode.NULL_VALUE),
+          keepUpdatedFieldsOnly = false,
           validator = validateInsertingAssignmentExpression)
     )
 
     // Append (encoded) deleting actions
     writeParams ++= deletingActions.headOption.map {
       case DeleteAction(condition) =>
-        PAYLOAD_DELETE_CONDITION -> 
serializeConditionalAssignments(Seq(condition -> Seq.empty))
+        PAYLOAD_DELETE_CONDITION -> 
serializeConditionalAssignments(Seq(condition -> Seq.empty),
+          keepUpdatedFieldsOnly = false)
     }.toSeq
 
     // Append
@@ -449,21 +470,58 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
       new StructType(targetTableSchema), structName, nameSpace)
   }
 
+  /**
+   * @param conditionalAssignments Conditional assignments.
+   * @return Updated fields based on the conditional assignments in the MERGE 
INTO statement.
+   */
+  private def getUpdatedFields(conditionalAssignments: Seq[Seq[Assignment]]): 
Seq[String] = {
+    val updatedFieldsSeq = {
+      conditionalAssignments.flatMap {
+        case assignments =>
+          // Extract all fields that are updated through the assignments
+          if (assignments.nonEmpty) {
+            assignments.map {
+              case Assignment(attr: Attribute, _) => attr
+              case a =>
+                throw new AnalysisException(s"Only assignments of the form 
`t.field = ...` are supported at the moment (provided: `${a.sql}`)")
+            }
+          } else {
+            Seq.empty
+          }
+      }
+    }.toSet
+
+    // Reorder the assignments to follow the ordering of the target table
+    mergeInto.targetTable.output
+      .filterNot(attr => isMetaField(attr.name))
+      .filter { tableAttr =>
+        updatedFieldsSeq.exists(attr => attributeEquals(attr, tableAttr))
+      }
+      .map(attr => attr.name)
+  }
+
   /**
    * Binds and serializes sequence of [[(Expression, Seq[Expression])]] where
    * <ul>
-   *   <li>First [[Expression]] designates condition (in update/insert 
clause)</li>
-   *   <li>Second [[Seq[Expression] ]] designates individual column 
assignments (in update/insert clause)</li>
+   * <li>First [[Expression]] designates condition (in update/insert 
clause)</li>
+   * <li>Second [[Seq[Expression] ]] designates individual column assignments 
(in update/insert clause)</li>
    * </ul>
    *
    * Such that
    * <ol>
-   *   <li>All expressions are bound against expected payload layout (and 
ready to be code-gen'd)</li>
-   *   <li>Serialized into Base64 string to be subsequently passed to 
[[ExpressionPayload]]</li>
+   * <li>All expressions are bound against expected payload layout (and ready 
to be code-gen'd)</li>
+   * <li>Serialized into Base64 string to be subsequently passed to 
[[ExpressionPayload]]</li>
    * </ol>
+   *
+   * When [[keepUpdatedFieldsOnly]] is false, all fields in the target table 
schema have
+   * corresponding assignments from the generation; When 
[[keepUpdatedFieldsOnly]] is true,
+   * i.e., for partial updates, only the fields as the assignees of the 
assignments have
+   * corresponding assignments, so that the generated records for updates only 
contain
+   * updated fields, to be written to the log files in a MOR table.
    */
   private def serializeConditionalAssignments(conditionalAssignments: 
Seq[(Option[Expression], Seq[Assignment])],
-                                              partialAssigmentMode: 
Option[PartialAssignmentMode] = None,
+                                              partialAssignmentMode: 
Option[PartialAssignmentMode] = None,
+                                              keepUpdatedFieldsOnly: Boolean,
                                               validator: Expression => Unit = 
scalaFunction1Noop): String = {
     val boundConditionalAssignments =
       conditionalAssignments.map {
@@ -473,7 +531,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
           //       All other actions are expected to provide assignments 
correspondent to every field
           //       of the [[targetTable]] being assigned
           val reorderedAssignments = if (assignments.nonEmpty) {
-            alignAssignments(assignments, partialAssigmentMode)
+            alignAssignments(assignments, partialAssignmentMode, 
keepUpdatedFieldsOnly)
           } else {
             Seq.empty
           }
@@ -498,40 +556,53 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
   /**
    * Re-orders assignment expressions to adhere to the ordering of that of 
[[targetTable]]
    */
-  private def alignAssignments(
-              assignments: Seq[Assignment],
-              partialAssigmentMode: Option[PartialAssignmentMode]): 
Seq[Assignment] = {
+  private def alignAssignments(assignments: Seq[Assignment],
+                               partialAssignmentMode: 
Option[PartialAssignmentMode],
+                               keepUpdatedFieldsOnly: Boolean): 
Seq[Assignment] = {
     val attr2Assignments = assignments.map {
-      case assign @ Assignment(attr: Attribute, _) => attr -> assign
+      case assign@Assignment(attr: Attribute, _) => attr -> assign
       case a =>
         throw new AnalysisException(s"Only assignments of the form `t.field = 
...` are supported at the moment (provided: `${a.sql}`)")
     }
 
     // Reorder the assignments to follow the ordering of the target table
-    mergeInto.targetTable.output
-      .filterNot(attr => isMetaField(attr.name))
-      .map { attr =>
-        attr2Assignments.find(tuple => attributeEquals(tuple._1, attr)) match {
-          case Some((_, assignment)) => assignment
-          case None =>
-            // In case partial assignments are allowed and there's no 
corresponding conditional assignment,
-            // create a self-assignment for the target table's attribute
-            partialAssigmentMode match {
-              case Some(mode) =>
-                mode match {
-                  case PartialAssignmentMode.NULL_VALUE =>
-                    Assignment(attr, Literal(null))
-                  case PartialAssignmentMode.ORIGINAL_VALUE =>
-                    Assignment(attr, attr)
-                  case PartialAssignmentMode.DEFAULT_VALUE =>
-                    Assignment(attr, Literal.default(attr.dataType))
-                }
-              case _ =>
-                throw new AnalysisException(s"Assignment expressions have to 
assign every attribute of target table " +
-                  s"(provided: `${assignments.map(_.sql).mkString(",")}`)")
-            }
+    if (keepUpdatedFieldsOnly) {
+      mergeInto.targetTable.output
+        .map(attr =>
+          attr2Assignments.find(tuple => attributeEquals(tuple._1, attr))
+        )
+        .filter(e => e.nonEmpty)
+        .map(e => e.get._2)
+    } else {
+      mergeInto.targetTable.output
+        .filterNot(attr => isMetaField(attr.name))
+        .map { attr =>
+          attr2Assignments.find(tuple => attributeEquals(tuple._1, attr)) 
match {
+            case Some((_, assignment)) => assignment
+            case None =>
+              // In case partial assignments are allowed and there's no 
corresponding conditional assignment,
+              // create a self-assignment for the target table's attribute
+              partialAssignmentMode match {
+                case Some(mode) =>
+                  mode match {
+                    case PartialAssignmentMode.NULL_VALUE =>
+                      Assignment(attr, Literal(null))
+                    case PartialAssignmentMode.ORIGINAL_VALUE =>
+                      if (targetTableType == MOR_TABLE_TYPE_OPT_VAL) {
+                        Assignment(attr, Literal(null))
+                      } else {
+                        Assignment(attr, attr)
+                      }
+                    case PartialAssignmentMode.DEFAULT_VALUE =>
+                      Assignment(attr, Literal.default(attr.dataType))
+                  }
+                case _ =>
+                  throw new AnalysisException(s"Assignment expressions have to 
assign every attribute of target table " +
+                    s"(provided: `${assignments.map(_.sql).mkString(",")}`)")
+              }
+          }
         }
-      }
+    }
   }
 
   /**
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
index 0989b8b09ae..404bcf4ff62 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
@@ -28,7 +28,7 @@ import org.apache.hudi.avro.HoodieAvroUtils
 import org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro
 import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, 
HoodiePayloadProps, HoodieRecord}
 import org.apache.hudi.common.util.ValidationUtils.checkState
-import org.apache.hudi.common.util.{BinaryUtil, ValidationUtils, Option => 
HOption}
+import org.apache.hudi.common.util.{BinaryUtil, ConfigUtils, StringUtils, 
ValidationUtils, Option => HOption}
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.exception.HoodieException
 import org.apache.spark.internal.Logging
@@ -124,7 +124,7 @@ class ExpressionPayload(@transient record: GenericRecord,
       // If the update condition matched  then execute assignment expression
       // to compute final record to update. We will return the first matched 
record.
       if (conditionEvalResult) {
-        val writerSchema = getWriterSchema(properties)
+        val writerSchema = getWriterSchema(properties, true)
         val resultingRow = assignmentEvaluator.apply(inputRecord.asRow)
         lazy val resultingAvroRecord = getAvroSerializerFor(writerSchema)
           .serialize(resultingRow)
@@ -204,7 +204,7 @@ class ExpressionPayload(@transient record: GenericRecord,
       // If matched the insert condition then execute the assignment 
expressions to compute the
       // result record. We will return the first matched record.
       if (conditionEvalResult) {
-        val writerSchema = getWriterSchema(properties)
+        val writerSchema = getWriterSchema(properties, false)
         val resultingRow = assignmentEvaluator.apply(inputRecord.asRow)
         val resultingAvroRecord = getAvroSerializerFor(writerSchema)
           .serialize(resultingRow)
@@ -411,6 +411,20 @@ object ExpressionPayload {
     parseSchema(props.getProperty(PAYLOAD_RECORD_AVRO_SCHEMA))
   }
 
+  private def getWriterSchema(props: Properties, shouldConsiderPartialUpdate: 
Boolean): Schema = {
+    if (shouldConsiderPartialUpdate) {
+      val partialSchema = ConfigUtils.getStringWithAltKeys(
+        props, HoodieWriteConfig.WRITE_PARTIAL_UPDATE_SCHEMA, true)
+      if (!StringUtils.isNullOrEmpty(partialSchema)) {
+        parseSchema(partialSchema)
+      } else {
+        getWriterSchema(props)
+      }
+    } else {
+      getWriterSchema(props)
+    }
+  }
+
   private def getWriterSchema(props: Properties): Schema = {
     
ValidationUtils.checkArgument(props.containsKey(HoodieWriteConfig.WRITE_SCHEMA_OVERRIDE.key),
       s"Missing ${HoodieWriteConfig.WRITE_SCHEMA_OVERRIDE.key} property")
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
index 63adacbf129..51f88eb2907 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.hudi
 
 import org.apache.hudi.DataSourceWriteOptions.SPARK_SQL_OPTIMIZED_WRITES
 import org.apache.hudi.common.fs.FSUtils
+import 
org.apache.hudi.config.HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT
 import org.apache.hudi.{DataSourceReadOptions, HoodieDataSourceHelpers, 
HoodieSparkUtils, ScalaAssertionSupport}
 import org.apache.spark.sql.internal.SQLConf
 
@@ -261,7 +262,8 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
   }
 
   test("Test MergeInto for MOR table ") {
-    withRecordType()(withTempDir {tmp =>
+    spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
+    withRecordType()(withTempDir { tmp =>
       spark.sql("set hoodie.payload.combined.schema.validate = true")
       val tableName = generateTableName
       // Create a mor partitioned table.
@@ -393,7 +395,8 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
   }
 
   test("Test MergeInto with insert only") {
-    withRecordType()(withTempDir {tmp =>
+    spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
+    withRecordType()(withTempDir { tmp =>
       spark.sql("set hoodie.payload.combined.schema.validate = true")
       // Create a partitioned mor table
       val tableName = generateTableName
@@ -448,6 +451,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
   }
 
   test("Test MergeInto For PreCombineField") {
+    spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
     withRecordType()(withTempDir { tmp =>
       spark.sql("set hoodie.payload.combined.schema.validate = true")
       Seq("cow", "mor").foreach { tableType =>
@@ -522,6 +526,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
   }
 
   test("Test MergeInto with preCombine field expression") {
+    spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
     withRecordType()(withTempDir { tmp =>
       spark.sql("set hoodie.payload.combined.schema.validate = true")
       Seq("cow", "mor").foreach { tableType =>
@@ -767,6 +772,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
   }
 
   test("Merge Hudi to Hudi") {
+    spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
     withRecordType()(withTempDir { tmp =>
       spark.sessionState.conf.setConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED, 
false)
       spark.sql("set hoodie.payload.combined.schema.validate = true")
@@ -934,6 +940,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
   }
 
   test("Test MergeInto For MOR With Compaction On") {
+    spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
     withRecordType()(withTempDir { tmp =>
       spark.sql("set hoodie.payload.combined.schema.validate = true")
       val tableName = generateTableName
@@ -1193,6 +1200,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
   }
 
   test("Test MergeInto with partial insert") {
+    spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
     Seq(true, false).foreach { sparkSqlOptimizedWrites =>
       withRecordType()(withTempDir { tmp =>
         spark.sql("set hoodie.payload.combined.schema.validate = true")
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
index d5dcfd01ad1..b8f315575dd 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
@@ -19,11 +19,13 @@ package org.apache.spark.sql.hudi
 
 import org.apache.hudi.HoodieSparkUtils
 import org.apache.hudi.common.table.HoodieTableMetaClient
+import 
org.apache.hudi.config.HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT
 import org.apache.spark.sql.Row
 
 class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
 
   test("Test MergeInto for MOR table 2") {
+    spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
     withRecordType()(withTempDir { tmp =>
       spark.sql("set hoodie.payload.combined.schema.validate = true")
       val tableName = generateTableName
@@ -178,6 +180,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
   }
 
   test("Test Merge With Complex Data Type") {
+    spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
     withRecordType()(withTempDir { tmp =>
       spark.sql("set hoodie.payload.combined.schema.validate = true")
       val tableName = generateTableName
@@ -649,6 +652,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
   }
 
   test("Test Merge Into For Source Table With Different Column Order") {
+    spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
     withRecordType()(withTempDir { tmp =>
       val tableName = generateTableName
       // Create a mor partitioned table.
@@ -765,6 +769,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
   }
 
   test("Test only insert for source table in dup key with preCombineField") {
+    spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
     Seq("cow", "mor").foreach {
       tableType => {
         withTempDir { tmp =>
@@ -830,6 +835,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
   }
 
   test("Test only insert for source table in dup key without preCombineField") 
{
+    spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 
${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.defaultValue()}")
     Seq("cow", "mor").foreach {
       tableType => {
         withTempDir { tmp =>
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala
index 2284d76ab3a..4e6232fe3fe 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala
@@ -17,15 +17,34 @@
 
 package org.apache.spark.sql.hudi
 
-import org.apache.hudi.HoodieSparkUtils
+import org.apache.avro.Schema
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig}
+import org.apache.hudi.common.engine.HoodieLocalEngineContext
+import org.apache.hudi.common.function.SerializableFunctionUnchecked
+import org.apache.hudi.common.model.{FileSlice, HoodieLogFile}
+import org.apache.hudi.common.table.log.HoodieLogFileReader
+import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType
+import org.apache.hudi.common.table.view.{FileSystemViewManager, 
FileSystemViewStorageConfig, SyncableFileSystemView}
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.testutils.HoodieTestUtils.{getDefaultHadoopConf, 
getLogFileListFromFileSlice}
+import 
org.apache.hudi.config.HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT
+import org.apache.hudi.metadata.HoodieTableMetadata
+import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkUtils}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+
+import java.util.{Collections, List}
+import scala.collection.JavaConverters._
 
 class TestPartialUpdateForMergeInto extends HoodieSparkSqlTestBase {
 
   test("Test Partial Update") {
     withTempDir { tmp =>
-      // TODO after we support partial update for MOR, we can add test case 
for 'mor'.
-      Seq("cow").foreach { tableType =>
+      Seq("cow", "mor").foreach { tableType =>
         val tableName = generateTableName
+        val basePath = tmp.getCanonicalPath + "/" + tableName
+        spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
+        spark.sql(s"set 
${DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key} = true")
         spark.sql(
           s"""
              |create table $tableName (
@@ -39,20 +58,60 @@ class TestPartialUpdateForMergeInto extends 
HoodieSparkSqlTestBase {
              | primaryKey = 'id',
              | preCombineField = '_ts'
              |)
-             |location '${tmp.getCanonicalPath}/$tableName'
+             |location '$basePath'
           """.stripMargin)
         spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
 
         spark.sql(
           s"""
              |merge into $tableName t0
-             |using ( select 1 as id, 'a1' as name, 12 as price, 1001 as ts) s0
+             |using ( select 1 as id, 'a1' as name, 12 as price, 1001 as ts ) 
s0
              |on t0.id = s0.id
              |when matched then update set price = s0.price, _ts = s0.ts
              |""".stripMargin)
-        checkAnswer(s"select id, name, price, _ts from $tableName")(
-          Seq(1, "a1", 12.0, 1001)
-        )
+        if (tableType.equals("cow")) {
+          checkAnswer(s"select id, name, price, _ts from $tableName")(
+            Seq(1, "a1", 12.0, 1001)
+          )
+        } else {
+          // TODO(HUDI-6801): validate data once merging of partial updates is 
implemented
+          // Validate the log file
+          val hadoopConf = getDefaultHadoopConf
+          val metaClient: HoodieTableMetaClient =
+            
HoodieTableMetaClient.builder.setConf(hadoopConf).setBasePath(basePath).build
+          val metadataConfig = HoodieMetadataConfig.newBuilder.build
+          val engineContext = new HoodieLocalEngineContext(hadoopConf)
+          val viewManager: FileSystemViewManager = 
FileSystemViewManager.createViewManager(
+            engineContext, metadataConfig, 
FileSystemViewStorageConfig.newBuilder.build,
+            HoodieCommonConfig.newBuilder.build,
+            new SerializableFunctionUnchecked[HoodieTableMetaClient, 
HoodieTableMetadata] {
+              override def apply(v1: HoodieTableMetaClient): 
HoodieTableMetadata = {
+                HoodieTableMetadata.create(
+                  engineContext, metadataConfig, 
metaClient.getBasePathV2.toString)
+              }
+            }
+          )
+          val fsView: SyncableFileSystemView = 
viewManager.getFileSystemView(metaClient)
+          val fileSlice: FileSlice = fsView.getAllFileSlices("").findFirst.get
+          val logFilePathList: List[String] = 
getLogFileListFromFileSlice(fileSlice)
+          Collections.sort(logFilePathList)
+          assertEquals(1, logFilePathList.size)
+
+          val avroSchema = new 
TableSchemaResolver(metaClient).getTableAvroSchema
+          val logReader = new HoodieLogFileReader(
+            metaClient.getFs, new HoodieLogFile(logFilePathList.get(0)),
+            avroSchema, 1024 * 1024, true, false, false,
+            "id", null)
+          assertTrue(logReader.hasNext)
+          val logBlockHeader = logReader.next().getLogBlockHeader
+          assertTrue(logBlockHeader.containsKey(HeaderMetadataType.SCHEMA))
+          assertTrue(logBlockHeader.containsKey(HeaderMetadataType.IS_PARTIAL))
+          val partialSchema = new 
Schema.Parser().parse(logBlockHeader.get(HeaderMetadataType.SCHEMA))
+          val expectedPartialSchema = 
HoodieAvroUtils.addMetadataFields(HoodieAvroUtils.generateProjectionSchema(
+            avroSchema, Seq("price", "_ts").asJava), false)
+          assertEquals(expectedPartialSchema, partialSchema)
+          
assertTrue(logBlockHeader.get(HeaderMetadataType.IS_PARTIAL).toBoolean)
+        }
 
         val tableName2 = generateTableName
         spark.sql(
@@ -77,9 +136,11 @@ class TestPartialUpdateForMergeInto extends 
HoodieSparkSqlTestBase {
              |on t0.id = s0.id
              |when matched then update set price = s0.price
              |""".stripMargin)
-        checkAnswer(s"select id, name, price from $tableName2")(
-          Seq(1, "a1", 12.0)
-        )
+        if (tableType.equals("cow")) {
+          checkAnswer(s"select id, name, price from $tableName2")(
+            Seq(1, "a1", 12.0)
+          )
+        }
       }
     }
   }


Reply via email to