nsivabalan commented on code in PR #10337:
URL: https://github.com/apache/hudi/pull/10337#discussion_r1428851434


##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala:
##########
@@ -182,6 +182,82 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
     }
   }
 
+
+  /**
+   * Test MIT with global index.
+   * HUDI-7131
+   */
+  test("Test HUDI-7131") {

Review Comment:
   lets name the test w/ some context. 
   "Test MergeInto with global index" 



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala:
##########
@@ -261,6 +337,65 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
     })
   }
 
+  test("Test MergeInto with changing partition") {
+    withRecordType()(withTempDir { tmp =>
+      withSQLConf("hoodie.index.type" -> "GLOBAL_SIMPLE") {
+        val sourceTable = generateTableName
+        val targetTable = generateTableName
+        spark.sql(
+          s"""
+             | create table $sourceTable
+             | using parquet
+             | partitioned by (partition)
+             | location '${tmp.getCanonicalPath}/$sourceTable'
+             | as
+             | select
+             | 1 as id,
+             | 2 as version,
+             | 'yes' as mergeCond,
+             | '2023-10-02' as partition
+          """.stripMargin
+        )
+        spark.sql(s"insert into $sourceTable values(2, 2, 'no', '2023-10-02')")
+        spark.sql(s"insert into $sourceTable values(3, 1, 'insert', 
'2023-10-01')")
+
+        spark.sql(
+          s"""
+             | create table $targetTable (
+             |  id int,
+             |  version int,
+             |  mergeCond string,
+             |  partition string
+             | ) using hudi
+             | partitioned by (partition)
+             | tblproperties (
+             |    'primaryKey' = 'id',
+             |    'type' = 'cow'
+             | )
+             | location '${tmp.getCanonicalPath}/$targetTable'
+           """.stripMargin)
+
+        spark.sql(s"insert into $targetTable values(1, 1, 'insert', 
'2023-10-01')")
+        spark.sql(s"insert into $targetTable values(2, 1, 'insert', 
'2023-10-01')")
+
+        spark.sql(
+          s"""
+             | merge into $targetTable t using
+             | (select * from $sourceTable) as s
+             | on t.id=s.id
+             | when matched and s.mergeCond = 'yes' then update set *
+             | when not matched then insert *
+           """.stripMargin)
+        checkAnswer(s"select id,version,partition from $targetTable order by 
id")(
+          Seq(1, 2, "2023-10-02"),
+          Seq(2, 1, "2023-10-01"),

Review Comment:
   why s.mergeCond is required in the match clause? 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -243,6 +247,56 @@ private static <R> HoodieData<HoodieRecord<R>> 
getExistingRecords(
         .getMergedRecords().iterator());
   }
 
+  /**
+   * getExistingRecords will create records with expression payload so we 
overwrite the config.
+   * Additionally, we don't want to restore this value because the write will 
fail later on.
+   * We also need the keygenerator so we can figure out the partition path 
after expression payload
+   * evaluates the merge.
+   */
+  private static BaseKeyGenerator 
maybeGetKeygenAndUpdatePayload(HoodieWriteConfig config) {
+    if 
(config.getPayloadClass().equals("org.apache.spark.sql.hudi.command.payload.ExpressionPayload"))
 {
+      config.setValue(HoodiePayloadConfig.PAYLOAD_CLASS_NAME.key(), 
HoodiePayloadConfig.PAYLOAD_CLASS_NAME.defaultValue());
+      try {
+        return (BaseKeyGenerator) 
HoodieAvroKeyGeneratorFactory.createKeyGenerator(config.getProps());
+      } catch (IOException e) {
+        throw new RuntimeException("KeyGenerator must inherit from 
BaseKeyGenerator to update a records partition path using spark sql merge 
into", e);
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Special merge handling for MIT
+   * We need to wait until after merging before we can add meta fields because
+   * ExpressionPayload does not allow rewriting
+   */
+  private static <R> Option<HoodieRecord<R>> 
mergeIncomingWithExistingRecordWithExpressionPayload(
+      HoodieRecord<R> incoming,
+      HoodieRecord<R> existing,
+      Schema writeSchema,
+      Schema existingSchema,
+      Schema writeSchemaWithMetaFields,
+      HoodieWriteConfig config,
+      HoodieRecordMerger recordMerger,
+      BaseKeyGenerator keyGenerator) throws IOException {
+    Option<Pair<HoodieRecord, Schema>> mergeResult = 
recordMerger.merge(existing, existingSchema,

Review Comment:
   so, we don't need to meta fields for this merge is it?
   but for the merge in L323, we need the meta fields prepended. 



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala:
##########
@@ -261,6 +337,65 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
     })
   }
 
+  test("Test MergeInto with changing partition") {
+    withRecordType()(withTempDir { tmp =>
+      withSQLConf("hoodie.index.type" -> "GLOBAL_SIMPLE") {
+        val sourceTable = generateTableName
+        val targetTable = generateTableName
+        spark.sql(
+          s"""
+             | create table $sourceTable
+             | using parquet
+             | partitioned by (partition)
+             | location '${tmp.getCanonicalPath}/$sourceTable'
+             | as
+             | select
+             | 1 as id,
+             | 2 as version,
+             | 'yes' as mergeCond,
+             | '2023-10-02' as partition
+          """.stripMargin
+        )
+        spark.sql(s"insert into $sourceTable values(2, 2, 'no', '2023-10-02')")
+        spark.sql(s"insert into $sourceTable values(3, 1, 'insert', 
'2023-10-01')")
+
+        spark.sql(
+          s"""
+             | create table $targetTable (
+             |  id int,
+             |  version int,
+             |  mergeCond string,
+             |  partition string
+             | ) using hudi
+             | partitioned by (partition)
+             | tblproperties (
+             |    'primaryKey' = 'id',
+             |    'type' = 'cow'
+             | )
+             | location '${tmp.getCanonicalPath}/$targetTable'
+           """.stripMargin)
+
+        spark.sql(s"insert into $targetTable values(1, 1, 'insert', 
'2023-10-01')")
+        spark.sql(s"insert into $targetTable values(2, 1, 'insert', 
'2023-10-01')")
+
+        spark.sql(
+          s"""
+             | merge into $targetTable t using
+             | (select * from $sourceTable) as s
+             | on t.id=s.id
+             | when matched and s.mergeCond = 'yes' then update set *
+             | when not matched then insert *
+           """.stripMargin)
+        checkAnswer(s"select id,version,partition from $targetTable order by 
id")(
+          Seq(1, 2, "2023-10-02"),
+          Seq(2, 1, "2023-10-01"),
+          Seq(3, 1, "2023-10-01")
+        )
+      }
+
+    })
+  }
+

Review Comment:
   can we add a test w/ global index, but update partition path = false. 
   



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala:
##########
@@ -182,6 +182,82 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
     }
   }
 
+
+  /**
+   * Test MIT with global index.
+   * HUDI-7131
+   */
+  test("Test HUDI-7131") {
+    if (HoodieSparkUtils.gteqSpark3_1) {
+      withRecordType()(withTempDir { tmp =>
+        withSQLConf("hoodie.index.type" -> "GLOBAL_BLOOM") {
+          val targetTable = generateTableName
+          spark.sql(
+            s"""
+               |create table ${targetTable} (
+               |  id int,
+               |  version int,
+               |  name string,
+               |  inc_day string
+               |) using hudi
+               |tblproperties (
+               |  type = 'cow',
+               |  primaryKey = 'id'
+               | )
+               |partitioned by (inc_day)
+               |location '${tmp.getCanonicalPath}/$targetTable'
+               |""".stripMargin)
+          spark.sql(
+            s"""
+               |merge into ${targetTable} as target
+               |using (
+               |select 1 as id, 1 as version, 'str_1' as name, '2023-10-01' as 
inc_day
+               |) source
+               |on source.id = target.id
+               |when matched then
+               |update set *
+               |when not matched then
+               |insert *
+               |""".stripMargin)
+          spark.sql(
+            s"""
+               |merge into ${targetTable} as target
+               |using (
+               |select 1 as id, 2 as version, 'str_2' as name, '2023-10-01' as 
inc_day
+               |) source
+               |on source.id = target.id
+               |when matched then
+               |update set *
+               |when not matched then
+               |insert *
+               |""".stripMargin)
+
+          checkAnswer(s"select id, version, name, inc_day from $targetTable")(
+            Seq(1, 2, "str_2", "2023-10-01")
+          )
+          // migrate the record to a new partition.
+
+          spark.sql(
+            s"""
+               |merge into ${targetTable} as target
+               |using (
+               |select 1 as id, 2 as version, 'str_2' as name, '2023-10-02' as 
inc_day
+               |) source
+               |on source.id = target.id
+               |when matched then
+               |update set *
+               |when not matched then
+               |insert *
+               |""".stripMargin)
+
+          checkAnswer(s"select id, version, name, inc_day from $targetTable")(
+            Seq(1, 2, "str_2", "2023-10-02")
+          )
+        }
+      })
+    }
+  }

Review Comment:
   can we reset the index type in the end



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala:
##########
@@ -261,6 +337,65 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
     })
   }
 
+  test("Test MergeInto with changing partition") {
+    withRecordType()(withTempDir { tmp =>
+      withSQLConf("hoodie.index.type" -> "GLOBAL_SIMPLE") {

Review Comment:
   can we parametrize this for both GLOBAL_BLOOM and GLOBAL_SIMPLE



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -243,6 +247,56 @@ private static <R> HoodieData<HoodieRecord<R>> 
getExistingRecords(
         .getMergedRecords().iterator());
   }
 
+  /**
+   * getExistingRecords will create records with expression payload so we 
overwrite the config.
+   * Additionally, we don't want to restore this value because the write will 
fail later on.
+   * We also need the keygenerator so we can figure out the partition path 
after expression payload
+   * evaluates the merge.
+   */
+  private static BaseKeyGenerator 
maybeGetKeygenAndUpdatePayload(HoodieWriteConfig config) {
+    if 
(config.getPayloadClass().equals("org.apache.spark.sql.hudi.command.payload.ExpressionPayload"))
 {
+      config.setValue(HoodiePayloadConfig.PAYLOAD_CLASS_NAME.key(), 
HoodiePayloadConfig.PAYLOAD_CLASS_NAME.defaultValue());

Review Comment:
   what incase we switch the default payload class later? should we instead 
hardcode the payload class.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -278,6 +337,7 @@ private static <R> Option<HoodieRecord<R>> 
mergeIncomingWithExistingRecord(
    */
   public static <R> HoodieData<HoodieRecord<R>> 
mergeForPartitionUpdatesIfNeeded(
       HoodieData<Pair<HoodieRecord<R>, Option<HoodieRecordGlobalLocation>>> 
incomingRecordsAndLocations, HoodieWriteConfig config, HoodieTable hoodieTable) 
{
+    final BaseKeyGenerator keyGenerator = 
maybeGetKeygenAndUpdatePayload(config);

Review Comment:
   if its may be, then lets use Option<>



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala:
##########
@@ -261,6 +337,65 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
     })
   }
 
+  test("Test MergeInto with changing partition") {
+    withRecordType()(withTempDir { tmp =>
+      withSQLConf("hoodie.index.type" -> "GLOBAL_SIMPLE") {
+        val sourceTable = generateTableName
+        val targetTable = generateTableName
+        spark.sql(
+          s"""
+             | create table $sourceTable
+             | using parquet
+             | partitioned by (partition)
+             | location '${tmp.getCanonicalPath}/$sourceTable'
+             | as
+             | select
+             | 1 as id,
+             | 2 as version,
+             | 'yes' as mergeCond,
+             | '2023-10-02' as partition
+          """.stripMargin
+        )
+        spark.sql(s"insert into $sourceTable values(2, 2, 'no', '2023-10-02')")
+        spark.sql(s"insert into $sourceTable values(3, 1, 'insert', 
'2023-10-01')")
+
+        spark.sql(
+          s"""
+             | create table $targetTable (
+             |  id int,
+             |  version int,
+             |  mergeCond string,
+             |  partition string
+             | ) using hudi
+             | partitioned by (partition)
+             | tblproperties (
+             |    'primaryKey' = 'id',
+             |    'type' = 'cow'
+             | )
+             | location '${tmp.getCanonicalPath}/$targetTable'
+           """.stripMargin)
+
+        spark.sql(s"insert into $targetTable values(1, 1, 'insert', 
'2023-10-01')")
+        spark.sql(s"insert into $targetTable values(2, 1, 'insert', 
'2023-10-01')")
+
+        spark.sql(
+          s"""
+             | merge into $targetTable t using
+             | (select * from $sourceTable) as s
+             | on t.id=s.id
+             | when matched and s.mergeCond = 'yes' then update set *
+             | when not matched then insert *
+           """.stripMargin)
+        checkAnswer(s"select id,version,partition from $targetTable order by 
id")(
+          Seq(1, 2, "2023-10-02"),
+          Seq(2, 1, "2023-10-01"),
+          Seq(3, 1, "2023-10-01")
+        )
+      }
+
+    })

Review Comment:
   lets reset index type



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java:
##########
@@ -52,13 +52,15 @@
 public class HoodieMergedReadHandle<T, I, K, O> extends HoodieReadHandle<T, I, 
K, O> {
 
   protected final Schema readerSchema;
+  protected final Schema baseFileReaderSchema;
 
   public HoodieMergedReadHandle(HoodieWriteConfig config,
                                 Option<String> instantTime,
                                 HoodieTable<T, I, K, O> hoodieTable,
                                 Pair<String, String> partitionPathFileIDPair) {
     super(config, instantTime, hoodieTable, partitionPathFileIDPair);
     readerSchema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(config.getSchema()), 
config.allowOperationMetadataField());
+    baseFileReaderSchema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(config.getWriteSchema()), 
config.allowOperationMetadataField());

Review Comment:
   can we add java docs to call out why we use writeSchema instead of just 
getSchema? 



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java:
##########
@@ -67,7 +67,7 @@ protected List<WriteStatus> computeNext() {
     // Executor service used for launching writer thread.
     HoodieExecutor<List<WriteStatus>> bufferedIteratorExecutor = null;
     try {
-      Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema());
+      Schema schema = new Schema.Parser().parse(hoodieConfig.getWriteSchema());

Review Comment:
   can we add java docs to call out why we use writeSchema instead of just 
getSchema



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -251,9 +305,14 @@ private static <R> Option<HoodieRecord<R>> 
mergeIncomingWithExistingRecord(
       HoodieRecord<R> existing,
       Schema writeSchema,
       HoodieWriteConfig config,
-      HoodieRecordMerger recordMerger) throws IOException {
+      HoodieRecordMerger recordMerger,
+      BaseKeyGenerator keyGenerator) throws IOException {
     Schema existingSchema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(config.getSchema()), 
config.allowOperationMetadataField());
     Schema writeSchemaWithMetaFields = 
HoodieAvroUtils.addMetadataFields(writeSchema, 
config.allowOperationMetadataField());
+    if (keyGenerator != null) {

Review Comment:
   oh, just now looked at the lineage of this. 
   I am not sure if we should think of a proper fix. 
   how can we be sure that overwrite w/ latest payload works.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -243,6 +247,56 @@ private static <R> HoodieData<HoodieRecord<R>> 
getExistingRecords(
         .getMergedRecords().iterator());
   }
 
+  /**
+   * getExistingRecords will create records with expression payload so we 
overwrite the config.
+   * Additionally, we don't want to restore this value because the write will 
fail later on.
+   * We also need the keygenerator so we can figure out the partition path 
after expression payload
+   * evaluates the merge.
+   */
+  private static BaseKeyGenerator 
maybeGetKeygenAndUpdatePayload(HoodieWriteConfig config) {
+    if 
(config.getPayloadClass().equals("org.apache.spark.sql.hudi.command.payload.ExpressionPayload"))
 {
+      config.setValue(HoodiePayloadConfig.PAYLOAD_CLASS_NAME.key(), 
HoodiePayloadConfig.PAYLOAD_CLASS_NAME.defaultValue());
+      try {
+        return (BaseKeyGenerator) 
HoodieAvroKeyGeneratorFactory.createKeyGenerator(config.getProps());
+      } catch (IOException e) {
+        throw new RuntimeException("KeyGenerator must inherit from 
BaseKeyGenerator to update a records partition path using spark sql merge 
into", e);
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Special merge handling for MIT
+   * We need to wait until after merging before we can add meta fields because
+   * ExpressionPayload does not allow rewriting
+   */
+  private static <R> Option<HoodieRecord<R>> 
mergeIncomingWithExistingRecordWithExpressionPayload(
+      HoodieRecord<R> incoming,
+      HoodieRecord<R> existing,
+      Schema writeSchema,
+      Schema existingSchema,
+      Schema writeSchemaWithMetaFields,
+      HoodieWriteConfig config,
+      HoodieRecordMerger recordMerger,
+      BaseKeyGenerator keyGenerator) throws IOException {
+    Option<Pair<HoodieRecord, Schema>> mergeResult = 
recordMerger.merge(existing, existingSchema,
+        incoming, writeSchemaWithMetaFields, config.getProps());
+    if (!mergeResult.isPresent()) {
+      return Option.empty();
+    }
+    HoodieRecord<R> result = mergeResult.get().getLeft();
+    if (result.getData().equals(HoodieRecord.SENTINEL)) {
+      return Option.of(result);
+    }
+    String partitionPath = keyGenerator.getPartitionPath((GenericRecord) 
result.getData());
+    HoodieRecord<R> withMeta = result.prependMetaFields(writeSchema, 
writeSchemaWithMetaFields,
+            new 
MetadataValues().setRecordKey(incoming.getRecordKey()).setPartitionPath(partitionPath),
 config.getProps());

Review Comment:
   can we declare this once and use it for all records. Looks like we are 
creating this N times for N records. 
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -251,9 +305,14 @@ private static <R> Option<HoodieRecord<R>> 
mergeIncomingWithExistingRecord(
       HoodieRecord<R> existing,
       Schema writeSchema,
       HoodieWriteConfig config,
-      HoodieRecordMerger recordMerger) throws IOException {
+      HoodieRecordMerger recordMerger,
+      BaseKeyGenerator keyGenerator) throws IOException {
     Schema existingSchema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(config.getSchema()), 
config.allowOperationMetadataField());
     Schema writeSchemaWithMetaFields = 
HoodieAvroUtils.addMetadataFields(writeSchema, 
config.allowOperationMetadataField());
+    if (keyGenerator != null) {

Review Comment:
   we should probably avoid this.
   can we check the payload config value and match it to expression payload and 
then route the call accordingly. 
   just by looking at the code, its not obvious that keygen will be set only in 
case of expression payload. 
   and its not future proof. what in case someone instantiates key gen at the 
caller side later down the line 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to