ssandona commented on issue #9870:
URL: https://github.com/apache/hudi/issues/9870#issuecomment-1764673796
@ad1happy2go I was able to create a replicable example. This with Hudi
0.13.1 creates duplicates. Here the code:
```
from pyspark.sql.functions import col, concat, lit, max, min, substring
#boundaries contains the min and max record keys for our parquet files
boundaries=[
("202001010800_W01D000","202001011142_W08D510"),
("202001011142_W08D511","202001011527_W09D191"),
("202001011527_W09D192","202001011918_W06D060"),
("202001011918_W06D061","202001021035_W07D398"),
("202001021035_W07D399","202001021446_W06D323"),
("202001021446_W06D324","202001021849_W10D611"),
("202001021849_W10D612","202001031032_W09D093"),
("202001031032_W09D094","202001031421_W02D642"),
("202001031421_W02D643","202001031819_W04D820"),
("202001031819_W04D821","202001040942_W04D517"),
("202001040942_W04D518","202001041339_W05D194"),
("202001041339_W05D195","202001041750_W04D705"),
("202001041750_W04D706","202001050943_W08D747"),
("202001050943_W08D748","202001051334_W03D323"),
("202001051334_W03D324","202001051734_W07D131"),
("202001051734_W07D132","202001060834_W06D931"),
("202001060834_W06D932","202001061214_W08D560"),
("202001061214_W08D561","202001061609_W03D994"),
("202001061609_W03D995","202001061936_W06D214"),
("202001061936_W06D215","202001071058_W01D516"),
("202001071058_W01D517","202001071433_W04D787"),
("202001071433_W04D788","202001071816_W02D479"),
("202001071816_W02D480","202001080943_W09D760"),
("202001080943_W09D761","202001081321_W09D203"),
("202001081321_W09D204","202001081709_W09D613"),
("202001081709_W09D614","202001090832_W04D040"),
("202001090832_W04D041","202001091218_W03D112"),
("202001091218_W03D113","202001091617_W04D122"),
("202001091617_W04D123","202001100813_W05D845"),
("202001100813_W05D846","202001101159_W02D784"),
("202001101159_W02D785","202001101602_W08D300"),
("202001101602_W08D301","202001101933_W04D707"),
("202001101933_W04D708","202001111117_W08D334"),
("202001111117_W08D335","202001111445_W09D842"),
("202001111445_W09D843","202001111839_W02D197"),
("202001111839_W02D198","202001121000_W09D790"),
("202001121000_W09D791","202001121402_W09D191"),
("202001121402_W09D192","202001121758_W08D490"),
("202001121758_W08D491","202001130945_W04D736"),
("202001130945_W04D737","202001131347_W08D641"),
("202001131347_W08D642","202001131709_W04D193"),
("202001131709_W04D194","202001140838_W08D085"),
("202001140838_W08D086","202001141239_W06D624"),
("202001141239_W06D625","202001141631_W01D250"),
("202001141631_W01D251","202001142037_W08D870"),
("202001142037_W08D872","202001151148_W03D994"),
("202001151148_W03D995","202001151548_W09D868"),
("202001151548_W09D869","202001151938_W01D811"),
("202001151938_W01D812","202001161106_W01D647"),
("202001161106_W01D648","202001161444_W08D145"),
("202001161444_W08D146","202001161818_W09D040"),
("202001161818_W09D041","202001170948_W09D291"),
("202001170948_W09D292","202001171340_W05D108"),
("202001171340_W05D109","202001171737_W01D174"),
("202001171737_W01D175","202001180857_W04D157"),
("202001180857_W04D158","202001181254_W03D190"),
("202001181254_W03D191","202001181654_W08D119"),
("202001181654_W08D120","202001190808_W05D553"),
("202001190808_W05D554","202001191148_W09D515"),
("202001191148_W09D516","202001191521_W10D747"),
("202001191521_W10D748","202001191911_W03D566"),
("202001191911_W03D567","202001201041_W10D180"),
("202001201041_W10D181","202001201438_W02D444"),
("202001201438_W02D445","202001201840_W09D433"),
("202001201840_W09D434","202001210954_W10D580"),
("202001210954_W10D581","202001211334_W04D053"),
("202001211334_W04D054","202001211713_W03D322"),
("202001211713_W03D323","202001220857_W08D820"),
("202001220857_W08D821","202001221238_W09D113"),
("202001221238_W09D114","202001221643_W02D631"),
("202001221643_W02D632","202001230811_W03D917"),
("202001230811_W03D918","202001231227_W03D824"),
("202001231227_W03D825","202001231636_W05D331"),
("202001231636_W05D332","202001232012_W06D780"),
("202001232012_W06D781","202001241110_W03D861"),
("202001241110_W03D862","202001241457_W03D479"),
("202001241457_W03D480","202001241836_W02D045"),
("202001241836_W02D046","202001251012_W02D123"),
("202001251012_W02D124","202001251412_W08D887"),
("202001251412_W08D888","202001251753_W03D975"),
("202001251753_W03D976","202001260910_W09D539"),
("202001260910_W09D540","202001261310_W03D550"),
("202001261310_W03D551","202001261657_W06D010"),
("202001261657_W06D011","202001270819_W04D626"),
("202001270819_W04D627","202001271225_W01D259"),
("202001271225_W01D260","202001271627_W07D108"),
("202001271627_W07D109","202001272020_W01D489"),
("202001272020_W01D490","202001281138_W02D526"),
("202001281138_W02D527","202001281535_W06D808"),
("202001281535_W06D809","202001281925_W07D727"),
("202001281925_W07D728","202001291042_W02D324"),
("202001291042_W02D325","202001291455_W05D184"),
("202001291455_W05D185","202001291832_W04D872"),
("202001291832_W04D873","202001300952_W08D536"),
("202001300952_W08D537","202001301333_W02D246"),
("202001301333_W02D247","202001301701_W06D724"),
("202001301701_W06D725","202001310854_W10D030"),
("202001310854_W10D031","202001311236_W01D447"),
("202001311236_W01D448","202001311625_W05D671"),
("202001311625_W05D672","202001312101_W10D776")
]
COW_TABLE_NAME="table_duplicates"
PARTITION_FIELD = "year,month"
PRECOMBINE_FIELD = "timestamp"
COW_TABLE_LOCATION="s3://mybucket/datasets/table_duplicates/"
hudi_options_opt = {
"hoodie.table.name": COW_TABLE_NAME,
"hoodie.table.type": "COPY_ON_WRITE",
"hoodie.index.type": "BLOOM",
"hoodie.datasource.write.recordkey.field": "id",
"hoodie.datasource.write.partitionpath.field": PARTITION_FIELD,
"hoodie.datasource.write.precombine.field": PRECOMBINE_FIELD,
"hoodie.datasource.write.hive_style_partitioning": "true",
"hoodie.datasource.hive_sync.enable": "true",
"hoodie.datasource.hive_sync.use_jdbc": "false",
"hoodie.datasource.hive_sync.mode": "hms",
"hoodie.metadata.enable": "true",
"hoodie.metadata.index.bloom.filter.enable": "true",
"hoodie.bloom.index.use.metadata": "true",
"hoodie.metadata.index.column.stats.enable": "true",
"hoodie.metadata.index.column.stats.column.list": "id,timestamp,value",
"hoodie.parquet.small.file.limit": -1
}
#From each entry inside boundaries we create a dataframe with 3 records, one
with the first tuple element as recordkey, one with the second tuple element as
recordkey and one with a recordkey in between the previous 2. We insert every
dataframe. In total we'll have 100 insert operations and also 100 generated
base files in Hudi (as we specified hoodie.parquet.small.file.limit as -1). In
addition the third generated record for each iteration is included inside
updates as we'll later on upsert that
updates=[]
for t in boundaries:
print(t)
calculated_id=t[1][ 0 : 17 ] + "{:03d}".format(int(t[1][ 17 : 20 ])-1)
inputDF = spark.createDataFrame(
[
(t[0], "1", t[0][ 0 : 12 ],2020,1),
(t[1], "1", t[1][ 0 : 12 ],2020,1),
(calculated_id, "1", t[1][ 0 : 12 ],2020,1)
],
["id", "value", "timestamp","year","month"]
)
updates.append((calculated_id,"2",2020,1))
inputDF.write.format("org.apache.hudi")\
.option("hoodie.datasource.write.operation", "insert")\
.options(**hudi_options_opt)\
.mode("append")\
.save(COW_TABLE_LOCATION)
#read the table
cow_table_opt = spark.read.format('org.apache.hudi')\
.options(**hudi_options_opt)\
.load(COW_TABLE_LOCATION)
#count the rows: we have 300 rows
cow_table_opt.count()
#generate a dataframe with the rows to upsert
inputDF = spark.createDataFrame(
updates,
["id", "value","year","month"]
)
#add timestamp colum
new_df=inputDF.withColumn("timestamp",substring("id", 0, 12))
#upsert the dataframe. This dataframe contains 100 updates
(new_df.write.format("org.apache.hudi")
.option("hoodie.datasource.write.operation", "upsert")
.options(**hudi_options_opt)
.mode("append")
.save(COW_TABLE_LOCATION))
#read the new data
cow_table_opt = spark.read.format('org.apache.hudi')\
.options(**hudi_options_opt)\
.load(COW_TABLE_LOCATION)
#count the rows: we have 400 rows which is wrong, those should be 300
cow_table_opt.count()
#Here we can see we have duplicates
cow_table_opt.groupBy("_hoodie_record_key").count().orderBy(desc("count")).show()
```
Can you give it a try?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]