ssandona commented on issue #9870:
URL: https://github.com/apache/hudi/issues/9870#issuecomment-1764673796

   @ad1happy2go I was able to create a replicable example. This with Hudi 
0.13.1 creates duplicates. Here the code:
   
   ```
   from pyspark.sql.functions import col, concat, lit, max, min, substring
   
   #boundaries contains the min and max record keys for our parquet files
   
   boundaries=[
   ("202001010800_W01D000","202001011142_W08D510"),
   ("202001011142_W08D511","202001011527_W09D191"),
   ("202001011527_W09D192","202001011918_W06D060"),
   ("202001011918_W06D061","202001021035_W07D398"),
   ("202001021035_W07D399","202001021446_W06D323"),
   ("202001021446_W06D324","202001021849_W10D611"),
   ("202001021849_W10D612","202001031032_W09D093"),
   ("202001031032_W09D094","202001031421_W02D642"),
   ("202001031421_W02D643","202001031819_W04D820"),
   ("202001031819_W04D821","202001040942_W04D517"),
   ("202001040942_W04D518","202001041339_W05D194"),
   ("202001041339_W05D195","202001041750_W04D705"),
   ("202001041750_W04D706","202001050943_W08D747"),
   ("202001050943_W08D748","202001051334_W03D323"),
   ("202001051334_W03D324","202001051734_W07D131"),
   ("202001051734_W07D132","202001060834_W06D931"),
   ("202001060834_W06D932","202001061214_W08D560"),
   ("202001061214_W08D561","202001061609_W03D994"),
   ("202001061609_W03D995","202001061936_W06D214"),
   ("202001061936_W06D215","202001071058_W01D516"),
   ("202001071058_W01D517","202001071433_W04D787"),
   ("202001071433_W04D788","202001071816_W02D479"),
   ("202001071816_W02D480","202001080943_W09D760"),
   ("202001080943_W09D761","202001081321_W09D203"),
   ("202001081321_W09D204","202001081709_W09D613"),
   ("202001081709_W09D614","202001090832_W04D040"),
   ("202001090832_W04D041","202001091218_W03D112"),
   ("202001091218_W03D113","202001091617_W04D122"),
   ("202001091617_W04D123","202001100813_W05D845"),
   ("202001100813_W05D846","202001101159_W02D784"),
   ("202001101159_W02D785","202001101602_W08D300"),
   ("202001101602_W08D301","202001101933_W04D707"),
   ("202001101933_W04D708","202001111117_W08D334"),
   ("202001111117_W08D335","202001111445_W09D842"),
   ("202001111445_W09D843","202001111839_W02D197"),
   ("202001111839_W02D198","202001121000_W09D790"),
   ("202001121000_W09D791","202001121402_W09D191"),
   ("202001121402_W09D192","202001121758_W08D490"),
   ("202001121758_W08D491","202001130945_W04D736"),
   ("202001130945_W04D737","202001131347_W08D641"),
   ("202001131347_W08D642","202001131709_W04D193"),
   ("202001131709_W04D194","202001140838_W08D085"),
   ("202001140838_W08D086","202001141239_W06D624"),
   ("202001141239_W06D625","202001141631_W01D250"),
   ("202001141631_W01D251","202001142037_W08D870"),
   ("202001142037_W08D872","202001151148_W03D994"),
   ("202001151148_W03D995","202001151548_W09D868"),
   ("202001151548_W09D869","202001151938_W01D811"),
   ("202001151938_W01D812","202001161106_W01D647"),
   ("202001161106_W01D648","202001161444_W08D145"),
   ("202001161444_W08D146","202001161818_W09D040"),
   ("202001161818_W09D041","202001170948_W09D291"),
   ("202001170948_W09D292","202001171340_W05D108"),
   ("202001171340_W05D109","202001171737_W01D174"),
   ("202001171737_W01D175","202001180857_W04D157"),
   ("202001180857_W04D158","202001181254_W03D190"),
   ("202001181254_W03D191","202001181654_W08D119"),
   ("202001181654_W08D120","202001190808_W05D553"),
   ("202001190808_W05D554","202001191148_W09D515"),
   ("202001191148_W09D516","202001191521_W10D747"),
   ("202001191521_W10D748","202001191911_W03D566"),
   ("202001191911_W03D567","202001201041_W10D180"),
   ("202001201041_W10D181","202001201438_W02D444"),
   ("202001201438_W02D445","202001201840_W09D433"),
   ("202001201840_W09D434","202001210954_W10D580"),
   ("202001210954_W10D581","202001211334_W04D053"),
   ("202001211334_W04D054","202001211713_W03D322"),
   ("202001211713_W03D323","202001220857_W08D820"),
   ("202001220857_W08D821","202001221238_W09D113"),
   ("202001221238_W09D114","202001221643_W02D631"),
   ("202001221643_W02D632","202001230811_W03D917"),
   ("202001230811_W03D918","202001231227_W03D824"),
   ("202001231227_W03D825","202001231636_W05D331"),
   ("202001231636_W05D332","202001232012_W06D780"),
   ("202001232012_W06D781","202001241110_W03D861"),
   ("202001241110_W03D862","202001241457_W03D479"),
   ("202001241457_W03D480","202001241836_W02D045"),
   ("202001241836_W02D046","202001251012_W02D123"),
   ("202001251012_W02D124","202001251412_W08D887"),
   ("202001251412_W08D888","202001251753_W03D975"),
   ("202001251753_W03D976","202001260910_W09D539"),
   ("202001260910_W09D540","202001261310_W03D550"),
   ("202001261310_W03D551","202001261657_W06D010"),
   ("202001261657_W06D011","202001270819_W04D626"),
   ("202001270819_W04D627","202001271225_W01D259"),
   ("202001271225_W01D260","202001271627_W07D108"),
   ("202001271627_W07D109","202001272020_W01D489"),
   ("202001272020_W01D490","202001281138_W02D526"),
   ("202001281138_W02D527","202001281535_W06D808"),
   ("202001281535_W06D809","202001281925_W07D727"),
   ("202001281925_W07D728","202001291042_W02D324"),
   ("202001291042_W02D325","202001291455_W05D184"),
   ("202001291455_W05D185","202001291832_W04D872"),
   ("202001291832_W04D873","202001300952_W08D536"),
   ("202001300952_W08D537","202001301333_W02D246"),
   ("202001301333_W02D247","202001301701_W06D724"),
   ("202001301701_W06D725","202001310854_W10D030"),
   ("202001310854_W10D031","202001311236_W01D447"),
   ("202001311236_W01D448","202001311625_W05D671"),
   ("202001311625_W05D672","202001312101_W10D776")
   ]
   
   COW_TABLE_NAME="table_duplicates"
   PARTITION_FIELD = "year,month"
   PRECOMBINE_FIELD = "timestamp"
   COW_TABLE_LOCATION="s3://mybucket/datasets/table_duplicates/"
   
   hudi_options_opt = {
       "hoodie.table.name": COW_TABLE_NAME,
       "hoodie.table.type": "COPY_ON_WRITE", 
       "hoodie.index.type": "BLOOM",
       "hoodie.datasource.write.recordkey.field": "id", 
       "hoodie.datasource.write.partitionpath.field": PARTITION_FIELD, 
       "hoodie.datasource.write.precombine.field": PRECOMBINE_FIELD, 
       "hoodie.datasource.write.hive_style_partitioning": "true", 
       "hoodie.datasource.hive_sync.enable": "true", 
       "hoodie.datasource.hive_sync.use_jdbc": "false", 
       "hoodie.datasource.hive_sync.mode": "hms", 
       "hoodie.metadata.enable": "true",
       "hoodie.metadata.index.bloom.filter.enable": "true",
       "hoodie.bloom.index.use.metadata": "true",
       "hoodie.metadata.index.column.stats.enable": "true",
       "hoodie.metadata.index.column.stats.column.list": "id,timestamp,value",
       "hoodie.parquet.small.file.limit": -1
   }
   
   #From each entry inside boundaries we create a dataframe with 3 records, one 
with the first tuple element as recordkey, one with the second tuple element as 
recordkey and one with a recordkey in between the previous 2. We insert every 
dataframe. In total we'll have 100 insert operations and also 100 generated 
base files in Hudi (as we specified hoodie.parquet.small.file.limit as -1). In 
addition the third generated record for each iteration is included inside 
updates as we'll later on upsert that
   
   updates=[]
   for t in boundaries:
       print(t)
       calculated_id=t[1][ 0 : 17 ] + "{:03d}".format(int(t[1][ 17 : 20 ])-1)
       inputDF = spark.createDataFrame(
       [
           (t[0], "1", t[0][ 0 : 12 ],2020,1),
           (t[1], "1", t[1][ 0 : 12 ],2020,1),
           (calculated_id, "1", t[1][ 0 : 12 ],2020,1)
       ],
       ["id", "value", "timestamp","year","month"]
       )
       updates.append((calculated_id,"2",2020,1))
   
       inputDF.write.format("org.apache.hudi")\
               .option("hoodie.datasource.write.operation", "insert")\
               .options(**hudi_options_opt)\
               .mode("append")\
               .save(COW_TABLE_LOCATION)
   
   #read the table
   cow_table_opt = spark.read.format('org.apache.hudi')\
               .options(**hudi_options_opt)\
               .load(COW_TABLE_LOCATION)
   
   #count the rows: we have 300 rows
   cow_table_opt.count()
   
   #generate a dataframe with the rows to upsert
   inputDF = spark.createDataFrame(
       updates,
       ["id", "value","year","month"]
       )
   
   #add timestamp colum
   new_df=inputDF.withColumn("timestamp",substring("id", 0, 12))
   
   #upsert the dataframe. This dataframe contains 100 updates
   (new_df.write.format("org.apache.hudi")
               .option("hoodie.datasource.write.operation", "upsert")
               .options(**hudi_options_opt)
               .mode("append")
               .save(COW_TABLE_LOCATION))
   
   #read the new data
   cow_table_opt = spark.read.format('org.apache.hudi')\
               .options(**hudi_options_opt)\
               .load(COW_TABLE_LOCATION)
   
   
   #count the rows: we have 400 rows which is wrong, those should be 300
   cow_table_opt.count()
   
   #Here we can see we have duplicates
   
cow_table_opt.groupBy("_hoodie_record_key").count().orderBy(desc("count")).show()
   ```
   
   Can you give it a try?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to