This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 0ad4560f2a4 [HUDI-6800] Support writing partial updates to the data
blocks in MOR tables (#9876)
0ad4560f2a4 is described below
commit 0ad4560f2a4de00e43814b0d6cef2886a8a38155
Author: Y Ethan Guo <[email protected]>
AuthorDate: Wed Oct 25 15:24:26 2023 -0700
[HUDI-6800] Support writing partial updates to the data blocks in MOR
tables (#9876)
This commit adds the functionality to write partial updates to the data
blocks in MOR tables, for Spark SQL MERGE INTO.
---
.../org/apache/hudi/config/HoodieWriteConfig.java | 18 ++-
.../org/apache/hudi/io/HoodieAppendHandle.java | 18 ++-
.../java/org/apache/hudi/io/HoodieWriteHandle.java | 2 +-
.../common/table/log/block/HoodieLogBlock.java | 2 +-
.../org/apache/hudi/common/util/ConfigUtils.java | 20 +--
.../scala/org/apache/hudi/DataSourceOptions.scala | 9 ++
.../hudi/command/MergeIntoHoodieTableCommand.scala | 147 +++++++++++++++------
.../hudi/command/payload/ExpressionPayload.scala | 20 ++-
.../apache/spark/sql/hudi/TestMergeIntoTable.scala | 12 +-
.../spark/sql/hudi/TestMergeIntoTable2.scala | 6 +
.../sql/hudi/TestPartialUpdateForMergeInto.scala | 83 ++++++++++--
11 files changed, 268 insertions(+), 69 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 8c08beaaef9..cc3876338cc 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -33,6 +33,7 @@ import org.apache.hudi.common.config.HoodieMetaserverConfig;
import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.HoodieTableServiceManagerConfig;
+import org.apache.hudi.common.config.HoodieTimeGeneratorConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
@@ -50,7 +51,6 @@ import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.marker.MarkerType;
-import org.apache.hudi.common.config.HoodieTimeGeneratorConfig;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.util.ConfigUtils;
@@ -756,6 +756,14 @@ public class HoodieWriteConfig extends HoodieConfig {
.withDocumentation("Whether to write record positions to the block
header for data blocks containing updates and delete blocks. "
+ "The record positions can be used to improve the performance of
merging records from base and log files.");
+ public static final ConfigProperty<String> WRITE_PARTIAL_UPDATE_SCHEMA =
ConfigProperty
+ .key("hoodie.write.partial.update.schema")
+ .defaultValue("")
+ .markAdvanced()
+ .sinceVersion("1.0.0")
+ .withDocumentation("Avro schema of the partial updates. This is
automatically set by the "
+ + "Hudi write client and user is not expected to manually change the
value.");
+
/**
* Config key with boolean value that indicates whether record being written
during MERGE INTO Spark SQL
* operation are already prepped.
@@ -2072,6 +2080,14 @@ public class HoodieWriteConfig extends HoodieConfig {
return getBoolean(WRITE_RECORD_POSITIONS);
}
+ public boolean shouldWritePartialUpdates() {
+ return !StringUtils.isNullOrEmpty(getString(WRITE_PARTIAL_UPDATE_SCHEMA));
+ }
+
+ public String getPartialUpdateSchema() {
+ return getString(WRITE_PARTIAL_UPDATE_SCHEMA);
+ }
+
public double getParquetCompressionRatio() {
return getDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION);
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index 4075541a750..cc1932ce27f 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -149,7 +149,14 @@ public class HoodieAppendHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
public HoodieAppendHandle(HoodieWriteConfig config, String instantTime,
HoodieTable<T, I, K, O> hoodieTable,
String partitionPath, String fileId,
Iterator<HoodieRecord<T>> recordItr, TaskContextSupplier taskContextSupplier) {
- super(config, instantTime, partitionPath, fileId, hoodieTable,
taskContextSupplier);
+ super(config, instantTime, partitionPath, fileId, hoodieTable,
+ config.shouldWritePartialUpdates()
+ // When enabling writing partial updates to the data blocks in log
files,
+ // i.e., partial update schema is set, the writer schema is the
partial
+ // schema containing the updated fields only
+ ? Option.of(new
Schema.Parser().parse(config.getPartialUpdateSchema()))
+ : Option.empty(),
+ taskContextSupplier);
this.fileId = fileId;
this.recordItr = recordItr;
this.sizeEstimator = new DefaultSizeEstimator();
@@ -465,7 +472,7 @@ public class HoodieAppendHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
blocks.add(getBlock(config, pickLogDataBlockFormat(), recordList,
shouldWriteRecordPositions,
getUpdatedHeader(header, blockSequenceNumber++, attemptNumber,
config,
- addBlockIdentifier()), keyField));
+ addBlockIdentifier()), keyField));
}
if (appendDeleteBlocks && recordsToDeleteWithPositions.size() > 0) {
@@ -652,6 +659,13 @@ public class HoodieAppendHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
if (addBlockIdentifier &&
!HoodieTableMetadata.isMetadataTable(config.getBasePath())) { // add block
sequence numbers only for data table.
updatedHeader.put(HeaderMetadataType.BLOCK_IDENTIFIER, attemptNumber +
"," + blockSequenceNumber);
}
+ if (config.shouldWritePartialUpdates()) {
+ // When enabling writing partial updates to the data blocks, the
"IS_PARTIAL" flag is also
+ // written to the block header so that the reader can differentiate
partial updates, i.e.,
+ // the "SCHEMA" header contains the partial schema.
+ updatedHeader.put(
+ HeaderMetadataType.IS_PARTIAL, Boolean.toString(true));
+ }
return updatedHeader;
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
index e1d043a5a29..8c76e322b09 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
@@ -30,8 +30,8 @@ import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.log.LogFileCreationCallback;
import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.LogFileCreationCallback;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
index 443bcab9b8e..31d7d0d006b 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
@@ -186,7 +186,7 @@ public abstract class HoodieLogBlock {
* new enums at the end.
*/
public enum HeaderMetadataType {
- INSTANT_TIME, TARGET_INSTANT_TIME, SCHEMA, COMMAND_BLOCK_TYPE,
COMPACTED_BLOCK_TIMES, RECORD_POSITIONS, BLOCK_IDENTIFIER
+ INSTANT_TIME, TARGET_INSTANT_TIME, SCHEMA, COMMAND_BLOCK_TYPE,
COMPACTED_BLOCK_TIMES, RECORD_POSITIONS, BLOCK_IDENTIFIER, IS_PARTIAL
}
/**
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
index 9e0655d6734..1fcde1b301b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
@@ -270,11 +270,11 @@ public class ConfigUtils {
* Gets the raw value for a {@link ConfigProperty} config from properties.
The key and
* alternative keys are used to fetch the config.
*
- * @param props Configs in {@link TypedProperties}.
+ * @param props Configs in {@link Properties}.
* @param configProperty {@link ConfigProperty} config to fetch.
* @return {@link Option} of value if the config exists; empty {@link
Option} otherwise.
*/
- public static Option<Object> getRawValueWithAltKeys(TypedProperties props,
+ public static Option<Object> getRawValueWithAltKeys(Properties props,
ConfigProperty<?>
configProperty) {
if (props.containsKey(configProperty.key())) {
return Option.ofNullable(props.get(configProperty.key()));
@@ -321,11 +321,11 @@ public class ConfigUtils {
* alternative keys are used to fetch the config. If the config is not
found, an
* {@link IllegalArgumentException} is thrown.
*
- * @param props Configs in {@link TypedProperties}.
+ * @param props Configs in {@link Properties}.
* @param configProperty {@link ConfigProperty} config of String type to
fetch.
* @return String value if the config exists.
*/
- public static String getStringWithAltKeys(TypedProperties props,
+ public static String getStringWithAltKeys(Properties props,
ConfigProperty<String>
configProperty) {
return getStringWithAltKeys(props, configProperty, false);
}
@@ -337,14 +337,14 @@ public class ConfigUtils {
* the properties. If not using default value, if the config is not found, an
* {@link IllegalArgumentException} is thrown.
*
- * @param props Configs in {@link TypedProperties}.
+ * @param props Configs in {@link Properties}.
* @param configProperty {@link ConfigProperty} config of String type to
fetch.
* @param useDefaultValue Whether to use default value from {@link
ConfigProperty}.
* @return String value if the config exists; otherwise, if the config does
not exist and
* {@code useDefaultValue} is true, returns default String value if there is
default value
* defined in the {@link ConfigProperty} config and {@code null} otherwise.
*/
- public static String getStringWithAltKeys(TypedProperties props,
+ public static String getStringWithAltKeys(Properties props,
ConfigProperty<String>
configProperty,
boolean useDefaultValue) {
if (useDefaultValue) {
@@ -363,12 +363,12 @@ public class ConfigUtils {
* alternative keys are used to fetch the config. The default value as the
input of the method
* is returned if the config is not found in the properties.
*
- * @param props Configs in {@link TypedProperties}.
+ * @param props Configs in {@link Properties}.
* @param configProperty {@link ConfigProperty} config of String type to
fetch.
* @param defaultValue Default value.
* @return String value if the config exists; default value otherwise.
*/
- public static String getStringWithAltKeys(TypedProperties props,
+ public static String getStringWithAltKeys(Properties props,
ConfigProperty<?> configProperty,
String defaultValue) {
Option<Object> rawValue = getRawValueWithAltKeys(props, configProperty);
@@ -429,12 +429,12 @@ public class ConfigUtils {
* alternative keys are used to fetch the config. The default value of
{@link ConfigProperty}
* config, if exists, is returned if the config is not found in the
properties.
*
- * @param props Configs in {@link TypedProperties}.
+ * @param props Configs in {@link Properties}.
* @param configProperty {@link ConfigProperty} config to fetch.
* @return boolean value if the config exists; default boolean value if the
config does not exist
* and there is default value defined in the {@link ConfigProperty} config;
{@code false} otherwise.
*/
- public static boolean getBooleanWithAltKeys(TypedProperties props,
+ public static boolean getBooleanWithAltKeys(Properties props,
ConfigProperty<?>
configProperty) {
Option<Object> rawValue = getRawValueWithAltKeys(props, configProperty);
boolean defaultValue = configProperty.hasDefaultValue()
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 0c8ead9618c..e7364316205 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -553,6 +553,15 @@ object DataSourceWriteOptions {
"look up as well. If you may use INSERT_INTO for mutable dataset, then
you may have to set this config value to \"upsert\". With upsert, you will " +
"get both precombine and updates to existing records on storage is also
honored. If not, you may see duplicates. ")
+ val ENABLE_MERGE_INTO_PARTIAL_UPDATES: ConfigProperty[Boolean] =
ConfigProperty
+ .key("hoodie.spark.sql.merge.into.partial.updates")
+ .defaultValue(false)
+ .markAdvanced()
+ .sinceVersion("1.0.0")
+ .withDocumentation("Whether to write partial updates to the data blocks
containing updates "
+ + "in MOR tables with Spark SQL MERGE INTO statement. The data blocks
containing partial "
+ + "updates have a schema with a subset of fields compared to the full
schema of the table.")
+
val NONE_INSERT_DUP_POLICY = "none"
val DROP_INSERT_DUP_POLICY = "drop"
val FAIL_INSERT_DUP_POLICY = "fail"
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index 253fae68ff1..4cbdf3778bd 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -25,7 +25,7 @@ import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.common.model.HoodieAvroRecordMerger
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.config.HoodieWriteConfig
-import org.apache.hudi.config.HoodieWriteConfig.{AVRO_SCHEMA_VALIDATE_ENABLE,
SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP, TBL_NAME}
+import org.apache.hudi.config.HoodieWriteConfig.{AVRO_SCHEMA_VALIDATE_ENABLE,
SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP, TBL_NAME, WRITE_PARTIAL_UPDATE_SCHEMA}
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hive.HiveSyncConfigHolder
import org.apache.hudi.sync.common.HoodieSyncConfig
@@ -49,6 +49,7 @@ import
org.apache.spark.sql.hudi.command.payload.ExpressionPayload._
import org.apache.spark.sql.types.{BooleanType, StructField, StructType}
import java.util.Base64
+import scala.collection.JavaConverters._
/**
* Hudi's implementation of the {@code MERGE INTO} (MIT) Spark SQL statement.
@@ -402,29 +403,49 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
// Append the table schema to the parameters. In the case of merge into,
the schema of projectedJoinedDF
// may be different from the target table, because the are transform
logical in the update or
// insert actions.
+ val fullSchema = getTableSchema
var writeParams = parameters +
(OPERATION.key -> operation) +
- (HoodieWriteConfig.WRITE_SCHEMA_OVERRIDE.key -> getTableSchema.toString)
+
+ (HoodieWriteConfig.WRITE_SCHEMA_OVERRIDE.key -> fullSchema.toString) +
(DataSourceWriteOptions.TABLE_TYPE.key -> targetTableType)
+ // Only enable writing partial updates to data blocks for upserts to MOR
tables,
+ // when ENABLE_MERGE_INTO_PARTIAL_UPDATES is set to true
+ val writePartialUpdates = (targetTableType == MOR_TABLE_TYPE_OPT_VAL
+ && operation == UPSERT_OPERATION_OPT_VAL
+ && parameters.getOrElse(
+ ENABLE_MERGE_INTO_PARTIAL_UPDATES.key,
+ ENABLE_MERGE_INTO_PARTIAL_UPDATES.defaultValue.toString).toBoolean)
+
+ if (writePartialUpdates) {
+ val updatedFieldSeq = getUpdatedFields(updatingActions.map(a =>
a.assignments))
+ writeParams ++= Seq(
+ WRITE_PARTIAL_UPDATE_SCHEMA.key ->
+ HoodieAvroUtils.generateProjectionSchema(fullSchema,
updatedFieldSeq.asJava).toString
+ )
+ }
+
writeParams ++= Seq(
// Append (encoded) updating actions
PAYLOAD_UPDATE_CONDITION_AND_ASSIGNMENTS ->
// NOTE: For updating clause we allow partial assignments, where only
some of the fields of the target
// table's records are updated (w/ the missing ones keeping
their existing values)
serializeConditionalAssignments(updatingActions.map(a => (a.condition,
a.assignments)),
- partialAssigmentMode = Some(PartialAssignmentMode.ORIGINAL_VALUE)),
+ partialAssignmentMode = Some(PartialAssignmentMode.ORIGINAL_VALUE),
+ keepUpdatedFieldsOnly = writePartialUpdates),
// Append (encoded) inserting actions
PAYLOAD_INSERT_CONDITION_AND_ASSIGNMENTS ->
serializeConditionalAssignments(insertingActions.map(a =>
(a.condition, a.assignments)),
- partialAssigmentMode = Some(PartialAssignmentMode.NULL_VALUE),
+ partialAssignmentMode = Some(PartialAssignmentMode.NULL_VALUE),
+ keepUpdatedFieldsOnly = false,
validator = validateInsertingAssignmentExpression)
)
// Append (encoded) deleting actions
writeParams ++= deletingActions.headOption.map {
case DeleteAction(condition) =>
- PAYLOAD_DELETE_CONDITION ->
serializeConditionalAssignments(Seq(condition -> Seq.empty))
+ PAYLOAD_DELETE_CONDITION ->
serializeConditionalAssignments(Seq(condition -> Seq.empty),
+ keepUpdatedFieldsOnly = false)
}.toSeq
// Append
@@ -449,21 +470,58 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
new StructType(targetTableSchema), structName, nameSpace)
}
+ /**
+ * @param conditionalAssignments Conditional assignments.
+ * @return Updated fields based on the conditional assignments in the MERGE
INTO statement.
+ */
+ private def getUpdatedFields(conditionalAssignments: Seq[Seq[Assignment]]):
Seq[String] = {
+ val updatedFieldsSeq = {
+ conditionalAssignments.flatMap {
+ case assignments =>
+ // Extract all fields that are updated through the assignments
+ if (assignments.nonEmpty) {
+ assignments.map {
+ case Assignment(attr: Attribute, _) => attr
+ case a =>
+ throw new AnalysisException(s"Only assignments of the form
`t.field = ...` are supported at the moment (provided: `${a.sql}`)")
+ }
+ } else {
+ Seq.empty
+ }
+ }
+ }.toSet
+
+ // Reorder the assignments to follow the ordering of the target table
+ mergeInto.targetTable.output
+ .filterNot(attr => isMetaField(attr.name))
+ .filter { tableAttr =>
+ updatedFieldsSeq.exists(attr => attributeEquals(attr, tableAttr))
+ }
+ .map(attr => attr.name)
+ }
+
/**
* Binds and serializes sequence of [[(Expression, Seq[Expression])]] where
* <ul>
- * <li>First [[Expression]] designates condition (in update/insert
clause)</li>
- * <li>Second [[Seq[Expression] ]] designates individual column
assignments (in update/insert clause)</li>
+ * <li>First [[Expression]] designates condition (in update/insert
clause)</li>
+ * <li>Second [[Seq[Expression] ]] designates individual column assignments
(in update/insert clause)</li>
* </ul>
*
* Such that
* <ol>
- * <li>All expressions are bound against expected payload layout (and
ready to be code-gen'd)</li>
- * <li>Serialized into Base64 string to be subsequently passed to
[[ExpressionPayload]]</li>
+ * <li>All expressions are bound against expected payload layout (and ready
to be code-gen'd)</li>
+ * <li>Serialized into Base64 string to be subsequently passed to
[[ExpressionPayload]]</li>
* </ol>
+ *
+ * When [[keepUpdatedFieldsOnly]] is false, all fields in the target table
schema have
+ * corresponding assignments from the generation; When
[[keepUpdatedFieldsOnly]] is true,
+ * i.e., for partial updates, only the fields as the assignees of the
assignments have
+ * corresponding assignments, so that the generated records for updates only
contain
+ * updated fields, to be written to the log files in a MOR table.
*/
private def serializeConditionalAssignments(conditionalAssignments:
Seq[(Option[Expression], Seq[Assignment])],
- partialAssigmentMode:
Option[PartialAssignmentMode] = None,
+ partialAssignmentMode:
Option[PartialAssignmentMode] = None,
+ keepUpdatedFieldsOnly: Boolean,
validator: Expression => Unit =
scalaFunction1Noop): String = {
val boundConditionalAssignments =
conditionalAssignments.map {
@@ -473,7 +531,7 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
// All other actions are expected to provide assignments
correspondent to every field
// of the [[targetTable]] being assigned
val reorderedAssignments = if (assignments.nonEmpty) {
- alignAssignments(assignments, partialAssigmentMode)
+ alignAssignments(assignments, partialAssignmentMode,
keepUpdatedFieldsOnly)
} else {
Seq.empty
}
@@ -498,40 +556,53 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
/**
* Re-orders assignment expressions to adhere to the ordering of that of
[[targetTable]]
*/
- private def alignAssignments(
- assignments: Seq[Assignment],
- partialAssigmentMode: Option[PartialAssignmentMode]):
Seq[Assignment] = {
+ private def alignAssignments(assignments: Seq[Assignment],
+ partialAssignmentMode:
Option[PartialAssignmentMode],
+ keepUpdatedFieldsOnly: Boolean):
Seq[Assignment] = {
val attr2Assignments = assignments.map {
- case assign @ Assignment(attr: Attribute, _) => attr -> assign
+ case assign@Assignment(attr: Attribute, _) => attr -> assign
case a =>
throw new AnalysisException(s"Only assignments of the form `t.field =
...` are supported at the moment (provided: `${a.sql}`)")
}
// Reorder the assignments to follow the ordering of the target table
- mergeInto.targetTable.output
- .filterNot(attr => isMetaField(attr.name))
- .map { attr =>
- attr2Assignments.find(tuple => attributeEquals(tuple._1, attr)) match {
- case Some((_, assignment)) => assignment
- case None =>
- // In case partial assignments are allowed and there's no
corresponding conditional assignment,
- // create a self-assignment for the target table's attribute
- partialAssigmentMode match {
- case Some(mode) =>
- mode match {
- case PartialAssignmentMode.NULL_VALUE =>
- Assignment(attr, Literal(null))
- case PartialAssignmentMode.ORIGINAL_VALUE =>
- Assignment(attr, attr)
- case PartialAssignmentMode.DEFAULT_VALUE =>
- Assignment(attr, Literal.default(attr.dataType))
- }
- case _ =>
- throw new AnalysisException(s"Assignment expressions have to
assign every attribute of target table " +
- s"(provided: `${assignments.map(_.sql).mkString(",")}`)")
- }
+ if (keepUpdatedFieldsOnly) {
+ mergeInto.targetTable.output
+ .map(attr =>
+ attr2Assignments.find(tuple => attributeEquals(tuple._1, attr))
+ )
+ .filter(e => e.nonEmpty)
+ .map(e => e.get._2)
+ } else {
+ mergeInto.targetTable.output
+ .filterNot(attr => isMetaField(attr.name))
+ .map { attr =>
+ attr2Assignments.find(tuple => attributeEquals(tuple._1, attr))
match {
+ case Some((_, assignment)) => assignment
+ case None =>
+ // In case partial assignments are allowed and there's no
corresponding conditional assignment,
+ // create a self-assignment for the target table's attribute
+ partialAssignmentMode match {
+ case Some(mode) =>
+ mode match {
+ case PartialAssignmentMode.NULL_VALUE =>
+ Assignment(attr, Literal(null))
+ case PartialAssignmentMode.ORIGINAL_VALUE =>
+ if (targetTableType == MOR_TABLE_TYPE_OPT_VAL) {
+ Assignment(attr, Literal(null))
+ } else {
+ Assignment(attr, attr)
+ }
+ case PartialAssignmentMode.DEFAULT_VALUE =>
+ Assignment(attr, Literal.default(attr.dataType))
+ }
+ case _ =>
+ throw new AnalysisException(s"Assignment expressions have to
assign every attribute of target table " +
+ s"(provided: `${assignments.map(_.sql).mkString(",")}`)")
+ }
+ }
}
- }
+ }
}
/**
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
index 0989b8b09ae..404bcf4ff62 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
@@ -28,7 +28,7 @@ import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload,
HoodiePayloadProps, HoodieRecord}
import org.apache.hudi.common.util.ValidationUtils.checkState
-import org.apache.hudi.common.util.{BinaryUtil, ValidationUtils, Option =>
HOption}
+import org.apache.hudi.common.util.{BinaryUtil, ConfigUtils, StringUtils,
ValidationUtils, Option => HOption}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieException
import org.apache.spark.internal.Logging
@@ -124,7 +124,7 @@ class ExpressionPayload(@transient record: GenericRecord,
// If the update condition matched then execute assignment expression
// to compute final record to update. We will return the first matched
record.
if (conditionEvalResult) {
- val writerSchema = getWriterSchema(properties)
+ val writerSchema = getWriterSchema(properties, true)
val resultingRow = assignmentEvaluator.apply(inputRecord.asRow)
lazy val resultingAvroRecord = getAvroSerializerFor(writerSchema)
.serialize(resultingRow)
@@ -204,7 +204,7 @@ class ExpressionPayload(@transient record: GenericRecord,
// If matched the insert condition then execute the assignment
expressions to compute the
// result record. We will return the first matched record.
if (conditionEvalResult) {
- val writerSchema = getWriterSchema(properties)
+ val writerSchema = getWriterSchema(properties, false)
val resultingRow = assignmentEvaluator.apply(inputRecord.asRow)
val resultingAvroRecord = getAvroSerializerFor(writerSchema)
.serialize(resultingRow)
@@ -411,6 +411,20 @@ object ExpressionPayload {
parseSchema(props.getProperty(PAYLOAD_RECORD_AVRO_SCHEMA))
}
+ private def getWriterSchema(props: Properties, shouldConsiderPartialUpdate:
Boolean): Schema = {
+ if (shouldConsiderPartialUpdate) {
+ val partialSchema = ConfigUtils.getStringWithAltKeys(
+ props, HoodieWriteConfig.WRITE_PARTIAL_UPDATE_SCHEMA, true)
+ if (!StringUtils.isNullOrEmpty(partialSchema)) {
+ parseSchema(partialSchema)
+ } else {
+ getWriterSchema(props)
+ }
+ } else {
+ getWriterSchema(props)
+ }
+ }
+
private def getWriterSchema(props: Properties): Schema = {
ValidationUtils.checkArgument(props.containsKey(HoodieWriteConfig.WRITE_SCHEMA_OVERRIDE.key),
s"Missing ${HoodieWriteConfig.WRITE_SCHEMA_OVERRIDE.key} property")
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
index 63adacbf129..51f88eb2907 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.hudi
import org.apache.hudi.DataSourceWriteOptions.SPARK_SQL_OPTIMIZED_WRITES
import org.apache.hudi.common.fs.FSUtils
+import
org.apache.hudi.config.HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT
import org.apache.hudi.{DataSourceReadOptions, HoodieDataSourceHelpers,
HoodieSparkUtils, ScalaAssertionSupport}
import org.apache.spark.sql.internal.SQLConf
@@ -261,7 +262,8 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
}
test("Test MergeInto for MOR table ") {
- withRecordType()(withTempDir {tmp =>
+ spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
+ withRecordType()(withTempDir { tmp =>
spark.sql("set hoodie.payload.combined.schema.validate = true")
val tableName = generateTableName
// Create a mor partitioned table.
@@ -393,7 +395,8 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
}
test("Test MergeInto with insert only") {
- withRecordType()(withTempDir {tmp =>
+ spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
+ withRecordType()(withTempDir { tmp =>
spark.sql("set hoodie.payload.combined.schema.validate = true")
// Create a partitioned mor table
val tableName = generateTableName
@@ -448,6 +451,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
}
test("Test MergeInto For PreCombineField") {
+ spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
withRecordType()(withTempDir { tmp =>
spark.sql("set hoodie.payload.combined.schema.validate = true")
Seq("cow", "mor").foreach { tableType =>
@@ -522,6 +526,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
}
test("Test MergeInto with preCombine field expression") {
+ spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
withRecordType()(withTempDir { tmp =>
spark.sql("set hoodie.payload.combined.schema.validate = true")
Seq("cow", "mor").foreach { tableType =>
@@ -767,6 +772,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
}
test("Merge Hudi to Hudi") {
+ spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
withRecordType()(withTempDir { tmp =>
spark.sessionState.conf.setConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED,
false)
spark.sql("set hoodie.payload.combined.schema.validate = true")
@@ -934,6 +940,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
}
test("Test MergeInto For MOR With Compaction On") {
+ spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
withRecordType()(withTempDir { tmp =>
spark.sql("set hoodie.payload.combined.schema.validate = true")
val tableName = generateTableName
@@ -1193,6 +1200,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
}
test("Test MergeInto with partial insert") {
+ spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
Seq(true, false).foreach { sparkSqlOptimizedWrites =>
withRecordType()(withTempDir { tmp =>
spark.sql("set hoodie.payload.combined.schema.validate = true")
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
index d5dcfd01ad1..b8f315575dd 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
@@ -19,11 +19,13 @@ package org.apache.spark.sql.hudi
import org.apache.hudi.HoodieSparkUtils
import org.apache.hudi.common.table.HoodieTableMetaClient
+import
org.apache.hudi.config.HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT
import org.apache.spark.sql.Row
class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
test("Test MergeInto for MOR table 2") {
+ spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
withRecordType()(withTempDir { tmp =>
spark.sql("set hoodie.payload.combined.schema.validate = true")
val tableName = generateTableName
@@ -178,6 +180,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
}
test("Test Merge With Complex Data Type") {
+ spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
withRecordType()(withTempDir { tmp =>
spark.sql("set hoodie.payload.combined.schema.validate = true")
val tableName = generateTableName
@@ -649,6 +652,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
}
test("Test Merge Into For Source Table With Different Column Order") {
+ spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
withRecordType()(withTempDir { tmp =>
val tableName = generateTableName
// Create a mor partitioned table.
@@ -765,6 +769,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
}
test("Test only insert for source table in dup key with preCombineField") {
+ spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
Seq("cow", "mor").foreach {
tableType => {
withTempDir { tmp =>
@@ -830,6 +835,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
}
test("Test only insert for source table in dup key without preCombineField")
{
+ spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} =
${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.defaultValue()}")
Seq("cow", "mor").foreach {
tableType => {
withTempDir { tmp =>
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala
index 2284d76ab3a..4e6232fe3fe 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala
@@ -17,15 +17,34 @@
package org.apache.spark.sql.hudi
-import org.apache.hudi.HoodieSparkUtils
+import org.apache.avro.Schema
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig}
+import org.apache.hudi.common.engine.HoodieLocalEngineContext
+import org.apache.hudi.common.function.SerializableFunctionUnchecked
+import org.apache.hudi.common.model.{FileSlice, HoodieLogFile}
+import org.apache.hudi.common.table.log.HoodieLogFileReader
+import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType
+import org.apache.hudi.common.table.view.{FileSystemViewManager,
FileSystemViewStorageConfig, SyncableFileSystemView}
+import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
+import org.apache.hudi.common.testutils.HoodieTestUtils.{getDefaultHadoopConf,
getLogFileListFromFileSlice}
+import
org.apache.hudi.config.HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT
+import org.apache.hudi.metadata.HoodieTableMetadata
+import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkUtils}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+
+import java.util.{Collections, List}
+import scala.collection.JavaConverters._
class TestPartialUpdateForMergeInto extends HoodieSparkSqlTestBase {
test("Test Partial Update") {
withTempDir { tmp =>
- // TODO after we support partial update for MOR, we can add test case
for 'mor'.
- Seq("cow").foreach { tableType =>
+ Seq("cow", "mor").foreach { tableType =>
val tableName = generateTableName
+ val basePath = tmp.getCanonicalPath + "/" + tableName
+ spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
+ spark.sql(s"set
${DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key} = true")
spark.sql(
s"""
|create table $tableName (
@@ -39,20 +58,60 @@ class TestPartialUpdateForMergeInto extends
HoodieSparkSqlTestBase {
| primaryKey = 'id',
| preCombineField = '_ts'
|)
- |location '${tmp.getCanonicalPath}/$tableName'
+ |location '$basePath'
""".stripMargin)
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
spark.sql(
s"""
|merge into $tableName t0
- |using ( select 1 as id, 'a1' as name, 12 as price, 1001 as ts) s0
+ |using ( select 1 as id, 'a1' as name, 12 as price, 1001 as ts )
s0
|on t0.id = s0.id
|when matched then update set price = s0.price, _ts = s0.ts
|""".stripMargin)
- checkAnswer(s"select id, name, price, _ts from $tableName")(
- Seq(1, "a1", 12.0, 1001)
- )
+ if (tableType.equals("cow")) {
+ checkAnswer(s"select id, name, price, _ts from $tableName")(
+ Seq(1, "a1", 12.0, 1001)
+ )
+ } else {
+ // TODO(HUDI-6801): validate data once merging of partial updates is
implemented
+ // Validate the log file
+ val hadoopConf = getDefaultHadoopConf
+ val metaClient: HoodieTableMetaClient =
+
HoodieTableMetaClient.builder.setConf(hadoopConf).setBasePath(basePath).build
+ val metadataConfig = HoodieMetadataConfig.newBuilder.build
+ val engineContext = new HoodieLocalEngineContext(hadoopConf)
+ val viewManager: FileSystemViewManager =
FileSystemViewManager.createViewManager(
+ engineContext, metadataConfig,
FileSystemViewStorageConfig.newBuilder.build,
+ HoodieCommonConfig.newBuilder.build,
+ new SerializableFunctionUnchecked[HoodieTableMetaClient,
HoodieTableMetadata] {
+ override def apply(v1: HoodieTableMetaClient):
HoodieTableMetadata = {
+ HoodieTableMetadata.create(
+ engineContext, metadataConfig,
metaClient.getBasePathV2.toString)
+ }
+ }
+ )
+ val fsView: SyncableFileSystemView =
viewManager.getFileSystemView(metaClient)
+ val fileSlice: FileSlice = fsView.getAllFileSlices("").findFirst.get
+ val logFilePathList: List[String] =
getLogFileListFromFileSlice(fileSlice)
+ Collections.sort(logFilePathList)
+ assertEquals(1, logFilePathList.size)
+
+ val avroSchema = new
TableSchemaResolver(metaClient).getTableAvroSchema
+ val logReader = new HoodieLogFileReader(
+ metaClient.getFs, new HoodieLogFile(logFilePathList.get(0)),
+ avroSchema, 1024 * 1024, true, false, false,
+ "id", null)
+ assertTrue(logReader.hasNext)
+ val logBlockHeader = logReader.next().getLogBlockHeader
+ assertTrue(logBlockHeader.containsKey(HeaderMetadataType.SCHEMA))
+ assertTrue(logBlockHeader.containsKey(HeaderMetadataType.IS_PARTIAL))
+ val partialSchema = new
Schema.Parser().parse(logBlockHeader.get(HeaderMetadataType.SCHEMA))
+ val expectedPartialSchema =
HoodieAvroUtils.addMetadataFields(HoodieAvroUtils.generateProjectionSchema(
+ avroSchema, Seq("price", "_ts").asJava), false)
+ assertEquals(expectedPartialSchema, partialSchema)
+
assertTrue(logBlockHeader.get(HeaderMetadataType.IS_PARTIAL).toBoolean)
+ }
val tableName2 = generateTableName
spark.sql(
@@ -77,9 +136,11 @@ class TestPartialUpdateForMergeInto extends
HoodieSparkSqlTestBase {
|on t0.id = s0.id
|when matched then update set price = s0.price
|""".stripMargin)
- checkAnswer(s"select id, name, price from $tableName2")(
- Seq(1, "a1", 12.0)
- )
+ if (tableType.equals("cow")) {
+ checkAnswer(s"select id, name, price from $tableName2")(
+ Seq(1, "a1", 12.0)
+ )
+ }
}
}
}