nsivabalan commented on code in PR #10337:
URL: https://github.com/apache/hudi/pull/10337#discussion_r1761365042


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -245,6 +250,59 @@ private static <R> HoodieData<HoodieRecord<R>> 
getExistingRecords(
         .getMergedRecords().iterator());
   }
 
+  /**
+   * getExistingRecords will create records with expression payload so we 
overwrite the config.
+   * Additionally, we don't want to restore this value because the write will 
fail later on.
+   * We also need the keygenerator so we can figure out the partition path 
after expression payload
+   * evaluates the merge.
+   */
+  private static Option<Pair<BaseKeyGenerator, HoodieWriteConfig>> 
maybeGetKeygenAndUpdatedWriteConfig(HoodieWriteConfig config, HoodieTableConfig 
tableConfig) {

Review Comment:
   can we change the signature a bit so that its easier to manage. 
   
   Let maybeGetKeygenAndUpdatedWriteConfig return 
Pair<Option<BaseKeyGenerator>, HoodieWriteConfig>
   
   first entry in the return will only be set for expression payload. 
   while 2nd entry reflects the updatedWriteConfig. 
   caller is expected to use this write config irrespective of wether its 
expression payload or not. 
   
   for eg, 
   ```
       HoodieData<HoodieRecord<R>> existingRecords = 
getExistingRecords(globalLocations,
           keyGeneratorWriteConfigOpt.isPresent() ? 
keyGeneratorWriteConfigOpt.get().getRight() : config, hoodieTable);
   ```
   
   we could simplify this line to 
   ```
       HoodieData<HoodieRecord<R>> existingRecords = 
getExistingRecords(globalLocations, 
keyGeneratorWriteConfigOpt.get().getRight(), hoodieTable);
   ```
   
   And make mergeIncomingWithExistingRecord() take in Option<BaseKeyGenerator> 
as last arg which will only be set for expression payload. 
   
   this makes is look cleaner. wdyt



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable.scala:
##########
@@ -263,6 +340,137 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
     })
   }
 
+  test("Test MergeInto with changing partition and global index") {
+    withRecordType()(withTempDir { tmp =>
+      withSQLConf("hoodie.index.type" -> "GLOBAL_SIMPLE") {
+        Seq("cow","mor").foreach { tableType => {
+          val sourceTable = generateTableName
+          val targetTable = generateTableName
+          spark.sql(
+            s"""
+               | create table $sourceTable
+               | using parquet
+               | partitioned by (partition)
+               | location '${tmp.getCanonicalPath}/$sourceTable'
+               | as
+               | select
+               | 1 as id,
+               | 2 as version,
+               | 'yes' as mergeCond,
+               | '2023-10-02' as partition
+            """.stripMargin
+          )
+          spark.sql(s"insert into $sourceTable values(2, 2, 'no', 
'2023-10-02')")
+          spark.sql(s"insert into $sourceTable values(3, 1, 'insert', 
'2023-10-01')")
+
+          spark.sql(
+            s"""
+               | create table $targetTable (
+               |  id int,
+               |  version int,
+               |  mergeCond string,
+               |  partition string
+               | ) using hudi
+               | partitioned by (partition)
+               | tblproperties (
+               |    'primaryKey' = 'id',
+               |    'type' = '$tableType',
+               |    'payloadClass' = 
'org.apache.hudi.common.model.DefaultHoodieRecordPayload',
+               |    'payloadType' = 'CUSTOM',
+               |    preCombineField = 'version'
+               | )
+               | location '${tmp.getCanonicalPath}/$targetTable'
+             """.stripMargin)
+
+          spark.sql(s"insert into $targetTable values(1, 1, 'insert', 
'2023-10-01')")
+          spark.sql(s"insert into $targetTable values(2, 3, 'insert', 
'2023-10-01')")
+
+          spark.sql(
+            s"""
+               | merge into $targetTable t using
+               | (select * from $sourceTable) as s
+               | on t.id=s.id
+               | when matched and s.mergeCond = 'yes' then update set *
+               | when not matched then insert *
+             """.stripMargin)
+          checkAnswer(s"select id,version,_hoodie_partition_path from 
$targetTable order by id")(
+            Seq(1, 2, "partition=2023-10-02"),

Review Comment:
   not sure I understand the expected value here



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -253,25 +311,31 @@ private static <R> Option<HoodieRecord<R>> 
mergeIncomingWithExistingRecord(
       HoodieRecord<R> existing,
       Schema writeSchema,
       HoodieWriteConfig config,
-      HoodieRecordMerger recordMerger) throws IOException {
+      HoodieRecordMerger recordMerger,
+      Option<Pair<BaseKeyGenerator, HoodieWriteConfig>> 
keyGeneratorWriteConfigOpt) throws IOException {
     Schema existingSchema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(config.getSchema()), 
config.allowOperationMetadataField());
     Schema writeSchemaWithMetaFields = 
HoodieAvroUtils.addMetadataFields(writeSchema, 
config.allowOperationMetadataField());
-    // prepend the hoodie meta fields as the incoming record does not have them
-    HoodieRecord incomingPrepended = incoming
-        .prependMetaFields(writeSchema, writeSchemaWithMetaFields, new 
MetadataValues().setRecordKey(incoming.getRecordKey()).setPartitionPath(incoming.getPartitionPath()),
 config.getProps());
-    // after prepend the meta fields, convert the record back to the original 
payload
-    HoodieRecord incomingWithMetaFields = incomingPrepended
-        .wrapIntoHoodieRecordPayloadWithParams(writeSchema, config.getProps(), 
Option.empty(), config.allowOperationMetadataField(), Option.empty(), false, 
Option.empty());
-    Option<Pair<HoodieRecord, Schema>> mergeResult = recordMerger
-        .merge(existing, existingSchema, incomingWithMetaFields, 
writeSchemaWithMetaFields, config.getProps());
-    if (mergeResult.isPresent()) {
-      // the merged record needs to be converted back to the original payload
-      HoodieRecord<R> merged = 
mergeResult.get().getLeft().wrapIntoHoodieRecordPayloadWithParams(
-          writeSchemaWithMetaFields, config.getProps(), Option.empty(),
-          config.allowOperationMetadataField(), Option.empty(), false, 
Option.of(writeSchema));
-      return Option.of(merged);
+    if (keyGeneratorWriteConfigOpt.isPresent()) {

Review Comment:
   how can we guarantee that this will be non empty only in case of expression 
payload? or atleast can we name the arg accordingly. sth like, 
keyGenWriteConfForExprsnPayloadOpt 
   



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable.scala:
##########
@@ -263,6 +340,137 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
     })
   }
 
+  test("Test MergeInto with changing partition and global index") {
+    withRecordType()(withTempDir { tmp =>
+      withSQLConf("hoodie.index.type" -> "GLOBAL_SIMPLE") {
+        Seq("cow","mor").foreach { tableType => {
+          val sourceTable = generateTableName
+          val targetTable = generateTableName
+          spark.sql(
+            s"""
+               | create table $sourceTable
+               | using parquet
+               | partitioned by (partition)
+               | location '${tmp.getCanonicalPath}/$sourceTable'
+               | as
+               | select
+               | 1 as id,
+               | 2 as version,
+               | 'yes' as mergeCond,
+               | '2023-10-02' as partition
+            """.stripMargin
+          )
+          spark.sql(s"insert into $sourceTable values(2, 2, 'no', 
'2023-10-02')")

Review Comment:
   can we make tests simpler to read. 
   for eg, why we insert into diff partitions. 
   can we insert to same partition for all 3 record keys. 
   and later ingest 1 of the record to new partition. 2nd one to same partition 
and leave the 3rd w/o touching. and also insert a new record key. 
   
   the cur test is bit hard to follow



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -245,6 +250,59 @@ private static <R> HoodieData<HoodieRecord<R>> 
getExistingRecords(
         .getMergedRecords().iterator());
   }
 
+  /**
+   * getExistingRecords will create records with expression payload so we 
overwrite the config.
+   * Additionally, we don't want to restore this value because the write will 
fail later on.
+   * We also need the keygenerator so we can figure out the partition path 
after expression payload
+   * evaluates the merge.
+   */
+  private static Option<Pair<BaseKeyGenerator, HoodieWriteConfig>> 
maybeGetKeygenAndUpdatedWriteConfig(HoodieWriteConfig config, HoodieTableConfig 
tableConfig) {
+    if 
(config.getPayloadClass().equals("org.apache.spark.sql.hudi.command.payload.ExpressionPayload"))
 {
+      TypedProperties typedProperties = new TypedProperties(config.getProps());
+      // set the payload class to table's payload class and not expresison 
payload. this will be used to read the existing records
+      
typedProperties.setProperty(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key(), 
tableConfig.getPayloadClass());
+      typedProperties.setProperty(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(), 
tableConfig.getPayloadClass());
+      HoodieWriteConfig writeConfig = 
HoodieWriteConfig.newBuilder().withProperties(typedProperties).build();
+      try {
+        return Option.of(Pair.of((BaseKeyGenerator) 
HoodieAvroKeyGeneratorFactory.createKeyGenerator(writeConfig.getProps()), 
writeConfig));
+      } catch (IOException e) {
+        throw new RuntimeException("KeyGenerator must inherit from 
BaseKeyGenerator to update a records partition path using spark sql merge 
into", e);
+      }
+    }
+    return Option.empty();
+  }
+
+  /**
+   * Special merge handling for MIT
+   * We need to wait until after merging before we can add meta fields because
+   * ExpressionPayload does not allow rewriting
+   */
+  private static <R> Option<HoodieRecord<R>> 
mergeIncomingWithExistingRecordWithExpressionPayload(
+      HoodieRecord<R> incoming,
+      HoodieRecord<R> existing,
+      Schema writeSchema,
+      Schema existingSchema,
+      Schema writeSchemaWithMetaFields,
+      HoodieWriteConfig config,
+      HoodieRecordMerger recordMerger,
+      BaseKeyGenerator keyGenerator) throws IOException {
+    Option<Pair<HoodieRecord, Schema>> mergeResult = 
recordMerger.merge(existing, existingSchema,
+        incoming, writeSchemaWithMetaFields, config.getProps());
+    if (!mergeResult.isPresent()) {
+      return Option.empty();
+    }
+    HoodieRecord<R> result = mergeResult.get().getLeft();
+    if (result.getData().equals(HoodieRecord.SENTINEL)) {

Review Comment:
   can we add comments on these SENTINELs 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -253,25 +311,31 @@ private static <R> Option<HoodieRecord<R>> 
mergeIncomingWithExistingRecord(
       HoodieRecord<R> existing,
       Schema writeSchema,
       HoodieWriteConfig config,
-      HoodieRecordMerger recordMerger) throws IOException {
+      HoodieRecordMerger recordMerger,
+      Option<Pair<BaseKeyGenerator, HoodieWriteConfig>> 
keyGeneratorWriteConfigOpt) throws IOException {
     Schema existingSchema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(config.getSchema()), 
config.allowOperationMetadataField());
     Schema writeSchemaWithMetaFields = 
HoodieAvroUtils.addMetadataFields(writeSchema, 
config.allowOperationMetadataField());
-    // prepend the hoodie meta fields as the incoming record does not have them
-    HoodieRecord incomingPrepended = incoming
-        .prependMetaFields(writeSchema, writeSchemaWithMetaFields, new 
MetadataValues().setRecordKey(incoming.getRecordKey()).setPartitionPath(incoming.getPartitionPath()),
 config.getProps());
-    // after prepend the meta fields, convert the record back to the original 
payload
-    HoodieRecord incomingWithMetaFields = incomingPrepended
-        .wrapIntoHoodieRecordPayloadWithParams(writeSchema, config.getProps(), 
Option.empty(), config.allowOperationMetadataField(), Option.empty(), false, 
Option.empty());
-    Option<Pair<HoodieRecord, Schema>> mergeResult = recordMerger
-        .merge(existing, existingSchema, incomingWithMetaFields, 
writeSchemaWithMetaFields, config.getProps());
-    if (mergeResult.isPresent()) {
-      // the merged record needs to be converted back to the original payload
-      HoodieRecord<R> merged = 
mergeResult.get().getLeft().wrapIntoHoodieRecordPayloadWithParams(
-          writeSchemaWithMetaFields, config.getProps(), Option.empty(),
-          config.allowOperationMetadataField(), Option.empty(), false, 
Option.of(writeSchema));
-      return Option.of(merged);
+    if (keyGeneratorWriteConfigOpt.isPresent()) {

Review Comment:
   also lets fix the naming of "maybeGetKeygenAndUpdatedWriteConfig" to reflect 
that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to