nsivabalan commented on code in PR #10337:
URL: https://github.com/apache/hudi/pull/10337#discussion_r1761365042
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -245,6 +250,59 @@ private static <R> HoodieData<HoodieRecord<R>>
getExistingRecords(
.getMergedRecords().iterator());
}
+ /**
+ * getExistingRecords will create records with expression payload so we
overwrite the config.
+ * Additionally, we don't want to restore this value because the write will
fail later on.
+ * We also need the keygenerator so we can figure out the partition path
after expression payload
+ * evaluates the merge.
+ */
+ private static Option<Pair<BaseKeyGenerator, HoodieWriteConfig>>
maybeGetKeygenAndUpdatedWriteConfig(HoodieWriteConfig config, HoodieTableConfig
tableConfig) {
Review Comment:
can we change the signature a bit so that its easier to manage.
Let maybeGetKeygenAndUpdatedWriteConfig return
Pair<Option<BaseKeyGenerator>, HoodieWriteConfig>
first entry in the return will only be set for expression payload.
while 2nd entry reflects the updatedWriteConfig.
caller is expected to use this write config irrespective of wether its
expression payload or not.
for eg,
```
HoodieData<HoodieRecord<R>> existingRecords =
getExistingRecords(globalLocations,
keyGeneratorWriteConfigOpt.isPresent() ?
keyGeneratorWriteConfigOpt.get().getRight() : config, hoodieTable);
```
we could simplify this line to
```
HoodieData<HoodieRecord<R>> existingRecords =
getExistingRecords(globalLocations,
keyGeneratorWriteConfigOpt.get().getRight(), hoodieTable);
```
And make mergeIncomingWithExistingRecord() take in Option<BaseKeyGenerator>
as last arg which will only be set for expression payload.
this makes is look cleaner. wdyt
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable.scala:
##########
@@ -263,6 +340,137 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
})
}
+ test("Test MergeInto with changing partition and global index") {
+ withRecordType()(withTempDir { tmp =>
+ withSQLConf("hoodie.index.type" -> "GLOBAL_SIMPLE") {
+ Seq("cow","mor").foreach { tableType => {
+ val sourceTable = generateTableName
+ val targetTable = generateTableName
+ spark.sql(
+ s"""
+ | create table $sourceTable
+ | using parquet
+ | partitioned by (partition)
+ | location '${tmp.getCanonicalPath}/$sourceTable'
+ | as
+ | select
+ | 1 as id,
+ | 2 as version,
+ | 'yes' as mergeCond,
+ | '2023-10-02' as partition
+ """.stripMargin
+ )
+ spark.sql(s"insert into $sourceTable values(2, 2, 'no',
'2023-10-02')")
+ spark.sql(s"insert into $sourceTable values(3, 1, 'insert',
'2023-10-01')")
+
+ spark.sql(
+ s"""
+ | create table $targetTable (
+ | id int,
+ | version int,
+ | mergeCond string,
+ | partition string
+ | ) using hudi
+ | partitioned by (partition)
+ | tblproperties (
+ | 'primaryKey' = 'id',
+ | 'type' = '$tableType',
+ | 'payloadClass' =
'org.apache.hudi.common.model.DefaultHoodieRecordPayload',
+ | 'payloadType' = 'CUSTOM',
+ | preCombineField = 'version'
+ | )
+ | location '${tmp.getCanonicalPath}/$targetTable'
+ """.stripMargin)
+
+ spark.sql(s"insert into $targetTable values(1, 1, 'insert',
'2023-10-01')")
+ spark.sql(s"insert into $targetTable values(2, 3, 'insert',
'2023-10-01')")
+
+ spark.sql(
+ s"""
+ | merge into $targetTable t using
+ | (select * from $sourceTable) as s
+ | on t.id=s.id
+ | when matched and s.mergeCond = 'yes' then update set *
+ | when not matched then insert *
+ """.stripMargin)
+ checkAnswer(s"select id,version,_hoodie_partition_path from
$targetTable order by id")(
+ Seq(1, 2, "partition=2023-10-02"),
Review Comment:
not sure I understand the expected value here
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -253,25 +311,31 @@ private static <R> Option<HoodieRecord<R>>
mergeIncomingWithExistingRecord(
HoodieRecord<R> existing,
Schema writeSchema,
HoodieWriteConfig config,
- HoodieRecordMerger recordMerger) throws IOException {
+ HoodieRecordMerger recordMerger,
+ Option<Pair<BaseKeyGenerator, HoodieWriteConfig>>
keyGeneratorWriteConfigOpt) throws IOException {
Schema existingSchema = HoodieAvroUtils.addMetadataFields(new
Schema.Parser().parse(config.getSchema()),
config.allowOperationMetadataField());
Schema writeSchemaWithMetaFields =
HoodieAvroUtils.addMetadataFields(writeSchema,
config.allowOperationMetadataField());
- // prepend the hoodie meta fields as the incoming record does not have them
- HoodieRecord incomingPrepended = incoming
- .prependMetaFields(writeSchema, writeSchemaWithMetaFields, new
MetadataValues().setRecordKey(incoming.getRecordKey()).setPartitionPath(incoming.getPartitionPath()),
config.getProps());
- // after prepend the meta fields, convert the record back to the original
payload
- HoodieRecord incomingWithMetaFields = incomingPrepended
- .wrapIntoHoodieRecordPayloadWithParams(writeSchema, config.getProps(),
Option.empty(), config.allowOperationMetadataField(), Option.empty(), false,
Option.empty());
- Option<Pair<HoodieRecord, Schema>> mergeResult = recordMerger
- .merge(existing, existingSchema, incomingWithMetaFields,
writeSchemaWithMetaFields, config.getProps());
- if (mergeResult.isPresent()) {
- // the merged record needs to be converted back to the original payload
- HoodieRecord<R> merged =
mergeResult.get().getLeft().wrapIntoHoodieRecordPayloadWithParams(
- writeSchemaWithMetaFields, config.getProps(), Option.empty(),
- config.allowOperationMetadataField(), Option.empty(), false,
Option.of(writeSchema));
- return Option.of(merged);
+ if (keyGeneratorWriteConfigOpt.isPresent()) {
Review Comment:
how can we guarantee that this will be non empty only in case of expression
payload? or atleast can we name the arg accordingly. sth like,
keyGenWriteConfForExprsnPayloadOpt
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable.scala:
##########
@@ -263,6 +340,137 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
})
}
+ test("Test MergeInto with changing partition and global index") {
+ withRecordType()(withTempDir { tmp =>
+ withSQLConf("hoodie.index.type" -> "GLOBAL_SIMPLE") {
+ Seq("cow","mor").foreach { tableType => {
+ val sourceTable = generateTableName
+ val targetTable = generateTableName
+ spark.sql(
+ s"""
+ | create table $sourceTable
+ | using parquet
+ | partitioned by (partition)
+ | location '${tmp.getCanonicalPath}/$sourceTable'
+ | as
+ | select
+ | 1 as id,
+ | 2 as version,
+ | 'yes' as mergeCond,
+ | '2023-10-02' as partition
+ """.stripMargin
+ )
+ spark.sql(s"insert into $sourceTable values(2, 2, 'no',
'2023-10-02')")
Review Comment:
can we make tests simpler to read.
for eg, why we insert into diff partitions.
can we insert to same partition for all 3 record keys.
and later ingest 1 of the record to new partition. 2nd one to same partition
and leave the 3rd w/o touching. and also insert a new record key.
the cur test is bit hard to follow
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -245,6 +250,59 @@ private static <R> HoodieData<HoodieRecord<R>>
getExistingRecords(
.getMergedRecords().iterator());
}
+ /**
+ * getExistingRecords will create records with expression payload so we
overwrite the config.
+ * Additionally, we don't want to restore this value because the write will
fail later on.
+ * We also need the keygenerator so we can figure out the partition path
after expression payload
+ * evaluates the merge.
+ */
+ private static Option<Pair<BaseKeyGenerator, HoodieWriteConfig>>
maybeGetKeygenAndUpdatedWriteConfig(HoodieWriteConfig config, HoodieTableConfig
tableConfig) {
+ if
(config.getPayloadClass().equals("org.apache.spark.sql.hudi.command.payload.ExpressionPayload"))
{
+ TypedProperties typedProperties = new TypedProperties(config.getProps());
+ // set the payload class to table's payload class and not expresison
payload. this will be used to read the existing records
+
typedProperties.setProperty(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key(),
tableConfig.getPayloadClass());
+ typedProperties.setProperty(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(),
tableConfig.getPayloadClass());
+ HoodieWriteConfig writeConfig =
HoodieWriteConfig.newBuilder().withProperties(typedProperties).build();
+ try {
+ return Option.of(Pair.of((BaseKeyGenerator)
HoodieAvroKeyGeneratorFactory.createKeyGenerator(writeConfig.getProps()),
writeConfig));
+ } catch (IOException e) {
+ throw new RuntimeException("KeyGenerator must inherit from
BaseKeyGenerator to update a records partition path using spark sql merge
into", e);
+ }
+ }
+ return Option.empty();
+ }
+
+ /**
+ * Special merge handling for MIT
+ * We need to wait until after merging before we can add meta fields because
+ * ExpressionPayload does not allow rewriting
+ */
+ private static <R> Option<HoodieRecord<R>>
mergeIncomingWithExistingRecordWithExpressionPayload(
+ HoodieRecord<R> incoming,
+ HoodieRecord<R> existing,
+ Schema writeSchema,
+ Schema existingSchema,
+ Schema writeSchemaWithMetaFields,
+ HoodieWriteConfig config,
+ HoodieRecordMerger recordMerger,
+ BaseKeyGenerator keyGenerator) throws IOException {
+ Option<Pair<HoodieRecord, Schema>> mergeResult =
recordMerger.merge(existing, existingSchema,
+ incoming, writeSchemaWithMetaFields, config.getProps());
+ if (!mergeResult.isPresent()) {
+ return Option.empty();
+ }
+ HoodieRecord<R> result = mergeResult.get().getLeft();
+ if (result.getData().equals(HoodieRecord.SENTINEL)) {
Review Comment:
can we add comments on these SENTINELs
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -253,25 +311,31 @@ private static <R> Option<HoodieRecord<R>>
mergeIncomingWithExistingRecord(
HoodieRecord<R> existing,
Schema writeSchema,
HoodieWriteConfig config,
- HoodieRecordMerger recordMerger) throws IOException {
+ HoodieRecordMerger recordMerger,
+ Option<Pair<BaseKeyGenerator, HoodieWriteConfig>>
keyGeneratorWriteConfigOpt) throws IOException {
Schema existingSchema = HoodieAvroUtils.addMetadataFields(new
Schema.Parser().parse(config.getSchema()),
config.allowOperationMetadataField());
Schema writeSchemaWithMetaFields =
HoodieAvroUtils.addMetadataFields(writeSchema,
config.allowOperationMetadataField());
- // prepend the hoodie meta fields as the incoming record does not have them
- HoodieRecord incomingPrepended = incoming
- .prependMetaFields(writeSchema, writeSchemaWithMetaFields, new
MetadataValues().setRecordKey(incoming.getRecordKey()).setPartitionPath(incoming.getPartitionPath()),
config.getProps());
- // after prepend the meta fields, convert the record back to the original
payload
- HoodieRecord incomingWithMetaFields = incomingPrepended
- .wrapIntoHoodieRecordPayloadWithParams(writeSchema, config.getProps(),
Option.empty(), config.allowOperationMetadataField(), Option.empty(), false,
Option.empty());
- Option<Pair<HoodieRecord, Schema>> mergeResult = recordMerger
- .merge(existing, existingSchema, incomingWithMetaFields,
writeSchemaWithMetaFields, config.getProps());
- if (mergeResult.isPresent()) {
- // the merged record needs to be converted back to the original payload
- HoodieRecord<R> merged =
mergeResult.get().getLeft().wrapIntoHoodieRecordPayloadWithParams(
- writeSchemaWithMetaFields, config.getProps(), Option.empty(),
- config.allowOperationMetadataField(), Option.empty(), false,
Option.of(writeSchema));
- return Option.of(merged);
+ if (keyGeneratorWriteConfigOpt.isPresent()) {
Review Comment:
also lets fix the naming of "maybeGetKeygenAndUpdatedWriteConfig" to reflect
that
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]