This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 3be964096ac [HUDI-7046] Fix partial merging logic based on projected
reader schema (#10011)
3be964096ac is described below
commit 3be964096ac625bafa4bc0b7625f3d05a0dd2994
Author: Y Ethan Guo <[email protected]>
AuthorDate: Wed Nov 8 06:20:00 2023 -0800
[HUDI-7046] Fix partial merging logic based on projected reader schema
(#10011)
---
.../apache/hudi/merge/SparkRecordMergingUtils.java | 3 +-
.../hudi/common/engine/HoodieReaderContext.java | 14 ++
.../read/HoodieBaseFileGroupRecordBuffer.java | 52 +++---
.../read/HoodieKeyBasedFileGroupRecordBuffer.java | 9 +-
.../HoodiePositionBasedFileGroupRecordBuffer.java | 9 +-
.../sql/hudi/TestPartialUpdateForMergeInto.scala | 184 ++++++++++++---------
6 files changed, 165 insertions(+), 106 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/merge/SparkRecordMergingUtils.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/merge/SparkRecordMergingUtils.java
index f02ebfc8293..e72c6dab8a5 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/merge/SparkRecordMergingUtils.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/merge/SparkRecordMergingUtils.java
@@ -115,6 +115,7 @@ public class SparkRecordMergingUtils {
InternalRow newPartialRow = newer.getData();
Map<Integer, StructField> mergedIdToFieldMapping =
mergedSchemaPair.getLeft();
+ Map<String, Integer> oldNameToIdMapping =
getCachedFieldNameToIdMapping(oldSchema);
Map<String, Integer> newPartialNameToIdMapping =
getCachedFieldNameToIdMapping(newSchema);
List<Object> values = new ArrayList<>(mergedIdToFieldMapping.size());
for (int fieldId = 0; fieldId < mergedIdToFieldMapping.size();
fieldId++) {
@@ -125,7 +126,7 @@ public class SparkRecordMergingUtils {
values.add(newPartialRow.get(ordInPartialUpdate,
structField.dataType()));
} else {
// The field does not exist in the newer record; picks the value
from older record
- values.add(oldRow.get(fieldId, structField.dataType()));
+ values.add(oldRow.get(oldNameToIdMapping.get(structField.name()),
structField.dataType()));
}
}
InternalRow mergedRow = new GenericInternalRow(values.toArray());
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
index d561b059183..b24a4109e75 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
@@ -174,4 +174,18 @@ public abstract class HoodieReaderContext<T> {
meta.put(INTERNAL_META_SCHEMA, schema);
return meta;
}
+
+ /**
+ * Updates the schema and reset the ordering value in existing metadata
mapping of a record.
+ *
+ * @param meta Metadata in a mapping.
+ * @param schema New schema to set.
+ * @return The input metadata mapping.
+ */
+ public Map<String, Object>
updateSchemaAndResetOrderingValInMetadata(Map<String, Object> meta,
+ Schema
schema) {
+ meta.remove(INTERNAL_META_ORDERING_FIELD);
+ meta.put(INTERNAL_META_SCHEMA, schema);
+ return meta;
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
index 90ebf71dfb1..b56885fb0d3 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
@@ -118,35 +118,39 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
* @return
* @throws IOException
*/
- protected Option<T> doProcessNextDataRecord(T record,
- Map<String, Object> metadata,
- Pair<Option<T>, Map<String,
Object>> existingRecordMetadataPair) throws IOException {
+ protected Option<Pair<T, Map<String, Object>>> doProcessNextDataRecord(T
record,
+
Map<String, Object> metadata,
+
Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair) throws
IOException {
if (existingRecordMetadataPair != null) {
// Merge and store the combined record
// Note that the incoming `record` is from an older commit, so it should
be put as
// the `older` in the merge API
- HoodieRecord<T> combinedRecord;
- if (enablePartialMerging) {
- combinedRecord = (HoodieRecord<T>) recordMerger.partialMerge(
- readerContext.constructHoodieRecord(Option.of(record), metadata),
- (Schema) metadata.get(INTERNAL_META_SCHEMA),
- readerContext.constructHoodieRecord(
- existingRecordMetadataPair.getLeft(),
existingRecordMetadataPair.getRight()),
- (Schema)
existingRecordMetadataPair.getRight().get(INTERNAL_META_SCHEMA),
- readerSchema,
- payloadProps).get().getLeft();
- } else {
- combinedRecord = (HoodieRecord<T>) recordMerger.merge(
- readerContext.constructHoodieRecord(Option.of(record), metadata),
- (Schema) metadata.get(INTERNAL_META_SCHEMA),
- readerContext.constructHoodieRecord(
- existingRecordMetadataPair.getLeft(),
existingRecordMetadataPair.getRight()),
- (Schema)
existingRecordMetadataPair.getRight().get(INTERNAL_META_SCHEMA),
- payloadProps).get().getLeft();
- }
+ Pair<HoodieRecord, Schema> combinedRecordAndSchema = enablePartialMerging
+ ? recordMerger.partialMerge(
+ readerContext.constructHoodieRecord(Option.of(record), metadata),
+ (Schema) metadata.get(INTERNAL_META_SCHEMA),
+ readerContext.constructHoodieRecord(
+ existingRecordMetadataPair.getLeft(),
existingRecordMetadataPair.getRight()),
+ (Schema)
existingRecordMetadataPair.getRight().get(INTERNAL_META_SCHEMA),
+ readerSchema,
+ payloadProps).get()
+ : recordMerger.merge(
+ readerContext.constructHoodieRecord(Option.of(record), metadata),
+ (Schema) metadata.get(INTERNAL_META_SCHEMA),
+ readerContext.constructHoodieRecord(
+ existingRecordMetadataPair.getLeft(),
existingRecordMetadataPair.getRight()),
+ (Schema)
existingRecordMetadataPair.getRight().get(INTERNAL_META_SCHEMA),
+ payloadProps).get();
+
+ HoodieRecord<T> combinedRecord = combinedRecordAndSchema.getLeft();
+
// If pre-combine returns existing record, no need to update it
if (combinedRecord.getData() !=
existingRecordMetadataPair.getLeft().get()) {
- return Option.of(combinedRecord.getData());
+ return Option.of(Pair.of(
+ combinedRecord.getData(),
+ enablePartialMerging
+ ?
readerContext.updateSchemaAndResetOrderingValInMetadata(metadata,
combinedRecordAndSchema.getRight())
+ : metadata));
}
return Option.empty();
} else {
@@ -154,7 +158,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
// NOTE: Record have to be cloned here to make sure if it holds
low-level engine-specific
// payload pointing into a shared, mutable (underlying) buffer we
get a clean copy of
// it since these records will be put into records(Map).
- return Option.of(record);
+ return Option.of(Pair.of(record, metadata));
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
index 6e5679adfbf..620c3d84940 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
@@ -86,9 +86,12 @@ public class HoodieKeyBasedFileGroupRecordBuffer<T> extends
HoodieBaseFileGroupR
@Override
public void processNextDataRecord(T record, Map<String, Object> metadata,
Object recordKey) throws IOException {
Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair =
records.get(recordKey);
- Option<T> mergedRecord = doProcessNextDataRecord(record, metadata,
existingRecordMetadataPair);
- if (mergedRecord.isPresent()) {
- records.put(recordKey,
Pair.of(Option.ofNullable(readerContext.seal(mergedRecord.get())), metadata));
+ Option<Pair<T, Map<String, Object>>> mergedRecordAndMetadata =
+ doProcessNextDataRecord(record, metadata, existingRecordMetadataPair);
+ if (mergedRecordAndMetadata.isPresent()) {
+ records.put(recordKey, Pair.of(
+
Option.ofNullable(readerContext.seal(mergedRecordAndMetadata.get().getLeft())),
+ mergedRecordAndMetadata.get().getRight()));
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
index 352e7c726d7..be8cf072919 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
@@ -114,9 +114,12 @@ public class HoodiePositionBasedFileGroupRecordBuffer<T>
extends HoodieBaseFileG
@Override
public void processNextDataRecord(T record, Map<String, Object> metadata,
Object recordPosition) throws IOException {
Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair =
records.get(recordPosition);
- Option<T> mergedRecord = doProcessNextDataRecord(record, metadata,
existingRecordMetadataPair);
- if (mergedRecord.isPresent()) {
- records.put(recordPosition,
Pair.of(Option.ofNullable(readerContext.seal(mergedRecord.get())), metadata));
+ Option<Pair<T, Map<String, Object>>> mergedRecordAndMetadata =
+ doProcessNextDataRecord(record, metadata, existingRecordMetadataPair);
+ if (mergedRecordAndMetadata.isPresent()) {
+ records.put(recordPosition, Pair.of(
+
Option.ofNullable(readerContext.seal(mergedRecordAndMetadata.get().getLeft())),
+ mergedRecordAndMetadata.get().getRight()));
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala
index f1375f0749d..0d2b1e243eb 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala
@@ -37,92 +37,124 @@ import
org.apache.hudi.config.HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATE
import org.apache.hudi.metadata.HoodieTableMetadata
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
-import java.io.File
import java.util.{Collections, List}
import scala.collection.JavaConverters._
class TestPartialUpdateForMergeInto extends HoodieSparkSqlTestBase {
- test("Test Partial Update") {
- withTempDir { tmp =>
- testPartialUpdate(tmp, "cow", "avro")
- testPartialUpdate(tmp, "mor", "avro")
- testPartialUpdate(tmp, "mor", "parquet")
- }
+ test("Test Partial Update with COW and Avro log format") {
+ testPartialUpdate("cow", "avro")
}
- def testPartialUpdate(tmp: File,
- tableType: String,
- logDataBlockFormat: String): Unit = {
- val tableName = generateTableName
- val basePath = tmp.getCanonicalPath + "/" + tableName
- spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
- spark.sql(s"set ${ENABLE_MERGE_INTO_PARTIAL_UPDATES.key} = true")
- spark.sql(s"set ${LOGFILE_DATA_BLOCK_FORMAT.key} = $logDataBlockFormat")
- spark.sql(s"set ${FILE_GROUP_READER_ENABLED.key} = true")
- spark.sql(s"set ${USE_NEW_HUDI_PARQUET_FILE_FORMAT.key} = true")
- spark.sql(
- s"""
- |create table $tableName (
- | id int,
- | name string,
- | price double,
- | _ts long
- |) using hudi
- |tblproperties(
- | type ='$tableType',
- | primaryKey = 'id',
- | preCombineField = '_ts'
- |)
- |location '$basePath'
- """.stripMargin)
- spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
-
- spark.sql(
- s"""
- |merge into $tableName t0
- |using ( select 1 as id, 'a1' as name, 12 as price, 1001 as ts ) s0
- |on t0.id = s0.id
- |when matched then update set price = s0.price, _ts = s0.ts
- |""".stripMargin)
-
- checkAnswer(s"select id, name, price, _ts from $tableName")(
- Seq(1, "a1", 12.0, 1001)
- )
+ test("Test Partial Update with MOR and Avro log format") {
+ testPartialUpdate("mor", "avro")
+ }
- if (tableType.equals("mor")) {
- validateLogBlock(basePath, 1, Seq("price", "_ts"))
- }
+ test("Test Partial Update with MOR and Parquet log format") {
+ testPartialUpdate("mor", "parquet")
+ }
- if (tableType.equals("cow")) {
- // No preCombine field
- val tableName2 = generateTableName
+ def testPartialUpdate(tableType: String,
+ logDataBlockFormat: String): Unit = {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val basePath = tmp.getCanonicalPath + "/" + tableName
+ spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
+ spark.sql(s"set ${ENABLE_MERGE_INTO_PARTIAL_UPDATES.key} = true")
+ spark.sql(s"set ${LOGFILE_DATA_BLOCK_FORMAT.key} = $logDataBlockFormat")
+ spark.sql(s"set ${FILE_GROUP_READER_ENABLED.key} = true")
+ spark.sql(s"set ${USE_NEW_HUDI_PARQUET_FILE_FORMAT.key} = true")
+
+ // Create a table with five data fields
spark.sql(
s"""
- |create table $tableName2 (
+ |create table $tableName (
| id int,
| name string,
- | price double
+ | price double,
+ | _ts long,
+ | description string
|) using hudi
|tblproperties(
| type ='$tableType',
- | primaryKey = 'id'
+ | primaryKey = 'id',
+ | preCombineField = '_ts'
|)
- |location '${tmp.getCanonicalPath}/$tableName2'
+ |location '$basePath'
""".stripMargin)
- spark.sql(s"insert into $tableName2 values(1, 'a1', 10)")
+ spark.sql(s"insert into $tableName values (1, 'a1', 10, 1000, 'a1:
desc1')," +
+ "(2, 'a2', 20, 1200, 'a2: desc2'), (3, 'a3', 30, 1250, 'a3: desc3')")
+ // Partial updates using MERGE INTO statement with changed fields:
"price" and "_ts"
spark.sql(
s"""
- |merge into $tableName2 t0
- |using ( select 1 as id, 'a1' as name, 12 as price) s0
+ |merge into $tableName t0
+ |using ( select 1 as id, 'a1' as name, 12 as price, 1001 as ts
+ |union select 3 as id, 'a3' as name, 25 as price, 1260 as ts) s0
|on t0.id = s0.id
- |when matched then update set price = s0.price
+ |when matched then update set price = s0.price, _ts = s0.ts
|""".stripMargin)
- checkAnswer(s"select id, name, price from $tableName2")(
- Seq(1, "a1", 12.0)
+ checkAnswer(s"select id, name, price, _ts, description from $tableName")(
+ Seq(1, "a1", 12.0, 1001, "a1: desc1"),
+ Seq(2, "a2", 20.0, 1200, "a2: desc2"),
+ Seq(3, "a3", 25.0, 1260, "a3: desc3")
)
+
+ if (tableType.equals("mor")) {
+ validateLogBlock(basePath, 1, Seq(Seq("price", "_ts")))
+ }
+
+ // Partial updates using MERGE INTO statement with changed fields:
"description" and "_ts"
+ spark.sql(
+ s"""
+ |merge into $tableName t0
+ |using ( select 1 as id, 'a1' as name, 'a1: updated desc1' as
description, 1023 as ts
+ |union select 2 as id, 'a2' as name, 'a2: updated desc2' as
description, 1270 as ts) s0
+ |on t0.id = s0.id
+ |when matched then update set description = s0.description, _ts =
s0.ts
+ |""".stripMargin)
+
+ checkAnswer(s"select id, name, price, _ts, description from $tableName")(
+ Seq(1, "a1", 12.0, 1023, "a1: updated desc1"),
+ Seq(2, "a2", 20.0, 1270, "a2: updated desc2"),
+ Seq(3, "a3", 25.0, 1260, "a3: desc3")
+ )
+
+ if (tableType.equals("mor")) {
+ validateLogBlock(basePath, 2, Seq(Seq("price", "_ts"), Seq("_ts",
"description")))
+ }
+
+ if (tableType.equals("cow")) {
+ // No preCombine field
+ val tableName2 = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName2 (
+ | id int,
+ | name string,
+ | price double
+ |) using hudi
+ |tblproperties(
+ | type ='$tableType',
+ | primaryKey = 'id'
+ |)
+ |location '${tmp.getCanonicalPath}/$tableName2'
+ """.stripMargin)
+ spark.sql(s"insert into $tableName2 values(1, 'a1', 10)")
+
+ spark.sql(
+ s"""
+ |merge into $tableName2 t0
+ |using ( select 1 as id, 'a1' as name, 12 as price) s0
+ |on t0.id = s0.id
+ |when matched then update set price = s0.price
+ |""".stripMargin)
+
+ checkAnswer(s"select id, name, price from $tableName2")(
+ Seq(1, "a1", 12.0)
+ )
+ }
}
}
@@ -174,7 +206,7 @@ class TestPartialUpdateForMergeInto extends
HoodieSparkSqlTestBase {
def validateLogBlock(basePath: String,
expectedNumLogFile: Int,
- changedFields: Seq[String]): Unit = {
+ changedFields: Seq[Seq[String]]): Unit = {
val hadoopConf = getDefaultHadoopConf
val metaClient: HoodieTableMetaClient =
HoodieTableMetaClient.builder.setConf(hadoopConf).setBasePath(basePath).build
@@ -197,18 +229,20 @@ class TestPartialUpdateForMergeInto extends
HoodieSparkSqlTestBase {
assertEquals(expectedNumLogFile, logFilePathList.size)
val avroSchema = new TableSchemaResolver(metaClient).getTableAvroSchema
- val logReader = new HoodieLogFileReader(
- metaClient.getFs, new HoodieLogFile(logFilePathList.get(0)),
- avroSchema, 1024 * 1024, true, false, false,
- "id", null)
- assertTrue(logReader.hasNext)
- val logBlockHeader = logReader.next().getLogBlockHeader
- assertTrue(logBlockHeader.containsKey(HeaderMetadataType.SCHEMA))
- assertTrue(logBlockHeader.containsKey(HeaderMetadataType.IS_PARTIAL))
- val partialSchema = new
Schema.Parser().parse(logBlockHeader.get(HeaderMetadataType.SCHEMA))
- val expectedPartialSchema =
HoodieAvroUtils.addMetadataFields(HoodieAvroUtils.generateProjectionSchema(
- avroSchema, changedFields.asJava), false)
- assertEquals(expectedPartialSchema, partialSchema)
- assertTrue(logBlockHeader.get(HeaderMetadataType.IS_PARTIAL).toBoolean)
+ for (i <- 0 until expectedNumLogFile) {
+ val logReader = new HoodieLogFileReader(
+ metaClient.getFs, new HoodieLogFile(logFilePathList.get(i)),
+ avroSchema, 1024 * 1024, true, false, false,
+ "id", null)
+ assertTrue(logReader.hasNext)
+ val logBlockHeader = logReader.next().getLogBlockHeader
+ assertTrue(logBlockHeader.containsKey(HeaderMetadataType.SCHEMA))
+ assertTrue(logBlockHeader.containsKey(HeaderMetadataType.IS_PARTIAL))
+ val partialSchema = new
Schema.Parser().parse(logBlockHeader.get(HeaderMetadataType.SCHEMA))
+ val expectedPartialSchema =
HoodieAvroUtils.addMetadataFields(HoodieAvroUtils.generateProjectionSchema(
+ avroSchema, changedFields(i).asJava), false)
+ assertEquals(expectedPartialSchema, partialSchema)
+ assertTrue(logBlockHeader.get(HeaderMetadataType.IS_PARTIAL).toBoolean)
+ }
}
}