[ 
https://issues.apache.org/jira/browse/HUDI-6946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aditya Goenka updated HUDI-6946:
--------------------------------
    Description: 
Github Issue - 

[https://github.com/apache/hudi/issues/9870]

 

Code to Reproduce - 

```
from pyspark.sql.functions import col, concat, lit, max, min, substring, desc

#boundaries contains the min and max record keys for our parquet files

boundaries=[
("202001010800_W01D000","202001011142_W08D510"),
("202001011142_W08D511","202001011527_W09D191"),
("202001011527_W09D192","202001011918_W06D060")
]

COW_TABLE_NAME="table_duplicates"
PARTITION_FIELD = "year,month"
PRECOMBINE_FIELD = "timestamp"
COW_TABLE_LOCATION="file:///tmp/issue_9870_1000"

hudi_options_opt =

{ "hoodie.table.name": COW_TABLE_NAME, "hoodie.table.type": "COPY_ON_WRITE", 
"hoodie.index.type": "BLOOM", "hoodie.datasource.write.recordkey.field": "id", 
"hoodie.datasource.write.partitionpath.field": PARTITION_FIELD, 
"hoodie.datasource.write.precombine.field": PRECOMBINE_FIELD, 
"hoodie.datasource.write.hive_style_partitioning": "true", 
"hoodie.metadata.enable": "true", "hoodie.bloom.index.use.metadata": "true", 
"hoodie.metadata.index.column.stats.enable": "true", 
"hoodie.parquet.small.file.limit": -1 }

updates=[]
for t in boundaries:
print(t)
calculated_id=t[1][ 0 : 17 ] + "\{:03d}".format(int(t[1][ 17 : 20 ])-1)
inputDF = spark.createDataFrame(
[
(t[0], "1", t[0][ 0 : 12 ],2020,1),
(t[1], "1", t[1][ 0 : 12 ],2020,1),
(calculated_id, "1", t[1][ 0 : 12 ],2020,1)
],
["id", "value", "timestamp","year","month"]
)
updates.append((calculated_id,"2",2020,1))

inputDF.write.format("org.apache.hudi") \
.option("hoodie.datasource.write.operation", "insert") \
.options(**hudi_options_opt) \
.mode("append") \
.save(COW_TABLE_LOCATION)

#read the table
cow_table_opt = spark.read.format('org.apache.hudi') \
.options(**hudi_options_opt) \
.load(COW_TABLE_LOCATION)

#count the rows: we have 300 rows
cow_table_opt.count()

#generate a dataframe with the rows to upsert
inputDF = spark.createDataFrame(
updates,
["id", "value","year","month"]
)

#add timestamp colum
new_df=inputDF.withColumn("timestamp",substring("id", 0, 12))

(new_df.write.format("org.apache.hudi")
.option("hoodie.datasource.write.operation", "upsert")
.options(**hudi_options_opt)
.mode("append")
.save(COW_TABLE_LOCATION))

#read the new data
cow_table_opt = spark.read.format('org.apache.hudi') \
.options(**hudi_options_opt) \
.load(COW_TABLE_LOCATION)

cow_table_opt.count()

#Here we can see we have duplicates
cow_table_opt.groupBy("year","month","_hoodie_record_key").count().orderBy(desc("count")).show(100,
 False)

```

  was:
Github Issue - 

[https://github.com/apache/hudi/issues/9870]

 

Code to Reproduce - 

```
from pyspark.sql.functions import col, concat, lit, max, min, substring, desc

#boundaries contains the min and max record keys for our parquet files

boundaries=[
("202001010800_W01D000","202001011142_W08D510"),
("202001011142_W08D511","202001011527_W09D191"),
("202001011527_W09D192","202001011918_W06D060")
]

COW_TABLE_NAME="table_duplicates"
PARTITION_FIELD = "year,month"
PRECOMBINE_FIELD = "timestamp"
COW_TABLE_LOCATION="file:///tmp/issue_9870_1000"

hudi_options_opt = {
"hoodie.table.name": COW_TABLE_NAME,
"hoodie.table.type": "COPY_ON_WRITE",
"hoodie.index.type": "BLOOM",
"hoodie.datasource.write.recordkey.field": "id",
"hoodie.datasource.write.partitionpath.field": PARTITION_FIELD,
"hoodie.datasource.write.precombine.field": PRECOMBINE_FIELD,
"hoodie.datasource.write.hive_style_partitioning": "true",
"hoodie.metadata.enable": "true",
"hoodie.bloom.index.use.metadata": "true",
"hoodie.metadata.index.column.stats.enable": "true",
"hoodie.parquet.small.file.limit": -1
}

#From each entry inside boundaries we create a dataframe with 3 records, one 
with the first tuple element as recordkey, one with the second tuple element as 
recordkey and one with a recordkey in between the previous 2. We insert every 
dataframe.
# In total we'll have 100 insert operations and also 100 generated base files 
in Hudi (as we specified hoodie.parquet.small.file.limit as -1). In addition 
the third generated record for each iteration is included inside updates as 
we'll later on upsert that

updates=[]
for t in boundaries:
print(t)
calculated_id=t[1][ 0 : 17 ] + "\{:03d}".format(int(t[1][ 17 : 20 ])-1)
inputDF = spark.createDataFrame(
[
(t[0], "1", t[0][ 0 : 12 ],2020,1),
(t[1], "1", t[1][ 0 : 12 ],2020,1),
(calculated_id, "1", t[1][ 0 : 12 ],2020,1)
],
["id", "value", "timestamp","year","month"]
)
updates.append((calculated_id,"2",2020,1))

inputDF.write.format("org.apache.hudi") \
.option("hoodie.datasource.write.operation", "insert") \
.options(**hudi_options_opt) \
.mode("append") \
.save(COW_TABLE_LOCATION)

#read the table
cow_table_opt = spark.read.format('org.apache.hudi') \
.options(**hudi_options_opt) \
.load(COW_TABLE_LOCATION)

#count the rows: we have 300 rows
cow_table_opt.count()

#generate a dataframe with the rows to upsert
inputDF = spark.createDataFrame(
updates,
["id", "value","year","month"]
)

#add timestamp colum
new_df=inputDF.withColumn("timestamp",substring("id", 0, 12))

#upsert the dataframe. This dataframe contains 100 updates
(new_df.write.format("org.apache.hudi")
.option("hoodie.datasource.write.operation", "upsert")
.options(**hudi_options_opt)
.mode("append")
.save(COW_TABLE_LOCATION))

#read the new data
cow_table_opt = spark.read.format('org.apache.hudi') \
.options(**hudi_options_opt) \
.load(COW_TABLE_LOCATION)


#count the rows: we have 400 rows which is wrong, those should be 300
cow_table_opt.count()

#Here we can see we have duplicates
cow_table_opt.groupBy("year","month","_hoodie_record_key").count().orderBy(desc("count")).show(100,
 False)


```


> Data Duplicates with range pruning while using hoodie.bloom.index.use.metadata
> ------------------------------------------------------------------------------
>
>                 Key: HUDI-6946
>                 URL: https://issues.apache.org/jira/browse/HUDI-6946
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: metadata, writer-core
>    Affects Versions: 0.13.1, 0.12.3, 0.14.0
>            Reporter: Aditya Goenka
>            Priority: Blocker
>             Fix For: 0.12.4, 0.14.1
>
>
> Github Issue - 
> [https://github.com/apache/hudi/issues/9870]
>  
> Code to Reproduce - 
> ```
> from pyspark.sql.functions import col, concat, lit, max, min, substring, desc
> #boundaries contains the min and max record keys for our parquet files
> boundaries=[
> ("202001010800_W01D000","202001011142_W08D510"),
> ("202001011142_W08D511","202001011527_W09D191"),
> ("202001011527_W09D192","202001011918_W06D060")
> ]
> COW_TABLE_NAME="table_duplicates"
> PARTITION_FIELD = "year,month"
> PRECOMBINE_FIELD = "timestamp"
> COW_TABLE_LOCATION="file:///tmp/issue_9870_1000"
> hudi_options_opt =
> { "hoodie.table.name": COW_TABLE_NAME, "hoodie.table.type": "COPY_ON_WRITE", 
> "hoodie.index.type": "BLOOM", "hoodie.datasource.write.recordkey.field": 
> "id", "hoodie.datasource.write.partitionpath.field": PARTITION_FIELD, 
> "hoodie.datasource.write.precombine.field": PRECOMBINE_FIELD, 
> "hoodie.datasource.write.hive_style_partitioning": "true", 
> "hoodie.metadata.enable": "true", "hoodie.bloom.index.use.metadata": "true", 
> "hoodie.metadata.index.column.stats.enable": "true", 
> "hoodie.parquet.small.file.limit": -1 }
> updates=[]
> for t in boundaries:
> print(t)
> calculated_id=t[1][ 0 : 17 ] + "\{:03d}".format(int(t[1][ 17 : 20 ])-1)
> inputDF = spark.createDataFrame(
> [
> (t[0], "1", t[0][ 0 : 12 ],2020,1),
> (t[1], "1", t[1][ 0 : 12 ],2020,1),
> (calculated_id, "1", t[1][ 0 : 12 ],2020,1)
> ],
> ["id", "value", "timestamp","year","month"]
> )
> updates.append((calculated_id,"2",2020,1))
> inputDF.write.format("org.apache.hudi") \
> .option("hoodie.datasource.write.operation", "insert") \
> .options(**hudi_options_opt) \
> .mode("append") \
> .save(COW_TABLE_LOCATION)
> #read the table
> cow_table_opt = spark.read.format('org.apache.hudi') \
> .options(**hudi_options_opt) \
> .load(COW_TABLE_LOCATION)
> #count the rows: we have 300 rows
> cow_table_opt.count()
> #generate a dataframe with the rows to upsert
> inputDF = spark.createDataFrame(
> updates,
> ["id", "value","year","month"]
> )
> #add timestamp colum
> new_df=inputDF.withColumn("timestamp",substring("id", 0, 12))
> (new_df.write.format("org.apache.hudi")
> .option("hoodie.datasource.write.operation", "upsert")
> .options(**hudi_options_opt)
> .mode("append")
> .save(COW_TABLE_LOCATION))
> #read the new data
> cow_table_opt = spark.read.format('org.apache.hudi') \
> .options(**hudi_options_opt) \
> .load(COW_TABLE_LOCATION)
> cow_table_opt.count()
> #Here we can see we have duplicates
> cow_table_opt.groupBy("year","month","_hoodie_record_key").count().orderBy(desc("count")).show(100,
>  False)
> ```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to