[ 
https://issues.apache.org/jira/browse/HUDI-6946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aditya Goenka updated HUDI-6946:
--------------------------------
    Description: 
Github Issue - 

[https://github.com/apache/hudi/issues/9870]

 

Code to Reproduce - 

```
from pyspark.sql.functions import col, concat, lit, max, min, substring, desc

COW_TABLE_NAME="table_duplicates"
PARTITION_FIELD = "year,month"
PRECOMBINE_FIELD = "timestamp"
COW_TABLE_LOCATION="file:///tmp/issue_9870_" + str(uuid.uuid4())

hudi_options_opt = {
"hoodie.table.name": COW_TABLE_NAME,
"hoodie.table.type": "COPY_ON_WRITE",
"hoodie.index.type": "BLOOM",
"hoodie.datasource.write.recordkey.field": "id",
"hoodie.datasource.write.partitionpath.field": PARTITION_FIELD,
"hoodie.datasource.write.precombine.field": PRECOMBINE_FIELD,
"hoodie.datasource.write.hive_style_partitioning": "true",
"hoodie.metadata.enable": "true",
"hoodie.bloom.index.use.metadata": "true",
"hoodie.metadata.index.column.stats.enable": "true",
"hoodie.parquet.small.file.limit": -1
}

COW_TABLE_LOCATION="file:///tmp/issue_9870_" + str(uuid.uuid4())

inputDF = spark.createDataFrame(
[
('1', "1", '1',2020,1),
('2', "1", '1',2020,1),
('3', "1", '1',2020,1)
],
["id", "value", "timestamp","year","month"]
)

(inputDF.write.format("org.apache.hudi")
.option("hoodie.datasource.write.operation", "upsert")
.options(**hudi_options_opt)
.mode("append")
.save(COW_TABLE_LOCATION))

upsertDF = spark.createDataFrame(
[
('3', "2", '1',2020,1)
],
["id", "value", "timestamp","year","month"]
)

(upsertDF.write.format("org.apache.hudi")
.option("hoodie.datasource.write.operation", "upsert")
.options(**hudi_options_opt)
.mode("append")
.save(COW_TABLE_LOCATION))

spark.read.format('org.apache.hudi').load(COW_TABLE_LOCATION).groupBy("year","month","_hoodie_record_key").count().orderBy(desc("count")).show(100,
 False)
```

  was:
Github Issue - 

[https://github.com/apache/hudi/issues/9870]

 

Code to Reproduce - 

```
from pyspark.sql.functions import col, concat, lit, max, min, substring, desc

#boundaries contains the min and max record keys for our parquet files

boundaries=[
("202001010800_W01D000","202001011142_W08D510"),
("202001011142_W08D511","202001011527_W09D191"),
("202001011527_W09D192","202001011918_W06D060")
]

COW_TABLE_NAME="table_duplicates"
PARTITION_FIELD = "year,month"
PRECOMBINE_FIELD = "timestamp"
COW_TABLE_LOCATION="file:///tmp/issue_9870_1000"

hudi_options_opt =

{ "hoodie.table.name": COW_TABLE_NAME, "hoodie.table.type": "COPY_ON_WRITE", 
"hoodie.index.type": "BLOOM", "hoodie.datasource.write.recordkey.field": "id", 
"hoodie.datasource.write.partitionpath.field": PARTITION_FIELD, 
"hoodie.datasource.write.precombine.field": PRECOMBINE_FIELD, 
"hoodie.datasource.write.hive_style_partitioning": "true", 
"hoodie.metadata.enable": "true", "hoodie.bloom.index.use.metadata": "true", 
"hoodie.metadata.index.column.stats.enable": "true", 
"hoodie.parquet.small.file.limit": -1 }

updates=[]
for t in boundaries:
print(t)
calculated_id=t[1][ 0 : 17 ] + "\{:03d}".format(int(t[1][ 17 : 20 ])-1)
inputDF = spark.createDataFrame(
[
(t[0], "1", t[0][ 0 : 12 ],2020,1),
(t[1], "1", t[1][ 0 : 12 ],2020,1),
(calculated_id, "1", t[1][ 0 : 12 ],2020,1)
],
["id", "value", "timestamp","year","month"]
)
updates.append((calculated_id,"2",2020,1))

inputDF.write.format("org.apache.hudi") \
.option("hoodie.datasource.write.operation", "insert") \
.options(**hudi_options_opt) \
.mode("append") \
.save(COW_TABLE_LOCATION)

#read the table
cow_table_opt = spark.read.format('org.apache.hudi') \
.options(**hudi_options_opt) \
.load(COW_TABLE_LOCATION)

#count the rows: we have 300 rows
cow_table_opt.count()

#generate a dataframe with the rows to upsert
inputDF = spark.createDataFrame(
updates,
["id", "value","year","month"]
)

#add timestamp colum
new_df=inputDF.withColumn("timestamp",substring("id", 0, 12))

(new_df.write.format("org.apache.hudi")
.option("hoodie.datasource.write.operation", "upsert")
.options(**hudi_options_opt)
.mode("append")
.save(COW_TABLE_LOCATION))

#read the new data
cow_table_opt = spark.read.format('org.apache.hudi') \
.options(**hudi_options_opt) \
.load(COW_TABLE_LOCATION)

cow_table_opt.count()

#Here we can see we have duplicates
cow_table_opt.groupBy("year","month","_hoodie_record_key").count().orderBy(desc("count")).show(100,
 False)

```


> Data Duplicates with range pruning while using hoodie.bloom.index.use.metadata
> ------------------------------------------------------------------------------
>
>                 Key: HUDI-6946
>                 URL: https://issues.apache.org/jira/browse/HUDI-6946
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: metadata, writer-core
>    Affects Versions: 0.13.1, 0.12.3, 0.14.0
>            Reporter: Aditya Goenka
>            Priority: Blocker
>             Fix For: 0.12.4, 0.14.1
>
>
> Github Issue - 
> [https://github.com/apache/hudi/issues/9870]
>  
> Code to Reproduce - 
> ```
> from pyspark.sql.functions import col, concat, lit, max, min, substring, desc
> COW_TABLE_NAME="table_duplicates"
> PARTITION_FIELD = "year,month"
> PRECOMBINE_FIELD = "timestamp"
> COW_TABLE_LOCATION="file:///tmp/issue_9870_" + str(uuid.uuid4())
> hudi_options_opt = {
> "hoodie.table.name": COW_TABLE_NAME,
> "hoodie.table.type": "COPY_ON_WRITE",
> "hoodie.index.type": "BLOOM",
> "hoodie.datasource.write.recordkey.field": "id",
> "hoodie.datasource.write.partitionpath.field": PARTITION_FIELD,
> "hoodie.datasource.write.precombine.field": PRECOMBINE_FIELD,
> "hoodie.datasource.write.hive_style_partitioning": "true",
> "hoodie.metadata.enable": "true",
> "hoodie.bloom.index.use.metadata": "true",
> "hoodie.metadata.index.column.stats.enable": "true",
> "hoodie.parquet.small.file.limit": -1
> }
> COW_TABLE_LOCATION="file:///tmp/issue_9870_" + str(uuid.uuid4())
> inputDF = spark.createDataFrame(
> [
> ('1', "1", '1',2020,1),
> ('2', "1", '1',2020,1),
> ('3', "1", '1',2020,1)
> ],
> ["id", "value", "timestamp","year","month"]
> )
> (inputDF.write.format("org.apache.hudi")
> .option("hoodie.datasource.write.operation", "upsert")
> .options(**hudi_options_opt)
> .mode("append")
> .save(COW_TABLE_LOCATION))
> upsertDF = spark.createDataFrame(
> [
> ('3', "2", '1',2020,1)
> ],
> ["id", "value", "timestamp","year","month"]
> )
> (upsertDF.write.format("org.apache.hudi")
> .option("hoodie.datasource.write.operation", "upsert")
> .options(**hudi_options_opt)
> .mode("append")
> .save(COW_TABLE_LOCATION))
> spark.read.format('org.apache.hudi').load(COW_TABLE_LOCATION).groupBy("year","month","_hoodie_record_key").count().orderBy(desc("count")).show(100,
>  False)
> ```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to