[
https://issues.apache.org/jira/browse/HUDI-6946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Aditya Goenka updated HUDI-6946:
--------------------------------
Description:
Github Issue -
[https://github.com/apache/hudi/issues/9870]
Code to Reproduce -
```
from pyspark.sql.functions import col, concat, lit, max, min, substring, desc
COW_TABLE_NAME="table_duplicates"
PARTITION_FIELD = "year,month"
PRECOMBINE_FIELD = "timestamp"
COW_TABLE_LOCATION="file:///tmp/issue_9870_" + str(uuid.uuid4())
hudi_options_opt = {
"hoodie.table.name": COW_TABLE_NAME,
"hoodie.table.type": "COPY_ON_WRITE",
"hoodie.index.type": "BLOOM",
"hoodie.datasource.write.recordkey.field": "id",
"hoodie.datasource.write.partitionpath.field": PARTITION_FIELD,
"hoodie.datasource.write.precombine.field": PRECOMBINE_FIELD,
"hoodie.datasource.write.hive_style_partitioning": "true",
"hoodie.metadata.enable": "true",
"hoodie.bloom.index.use.metadata": "true",
"hoodie.metadata.index.column.stats.enable": "true",
"hoodie.parquet.small.file.limit": -1
}
COW_TABLE_LOCATION="file:///tmp/issue_9870_" + str(uuid.uuid4())
inputDF = spark.createDataFrame(
[
('1', "1", '1',2020,1),
('2', "1", '1',2020,1),
('3', "1", '1',2020,1)
],
["id", "value", "timestamp","year","month"]
)
(inputDF.write.format("org.apache.hudi")
.option("hoodie.datasource.write.operation", "upsert")
.options(**hudi_options_opt)
.mode("append")
.save(COW_TABLE_LOCATION))
upsertDF = spark.createDataFrame(
[
('3', "2", '1',2020,1)
],
["id", "value", "timestamp","year","month"]
)
(upsertDF.write.format("org.apache.hudi")
.option("hoodie.datasource.write.operation", "upsert")
.options(**hudi_options_opt)
.mode("append")
.save(COW_TABLE_LOCATION))
spark.read.format('org.apache.hudi').load(COW_TABLE_LOCATION).groupBy("year","month","_hoodie_record_key").count().orderBy(desc("count")).show(100,
False)
```
was:
Github Issue -
[https://github.com/apache/hudi/issues/9870]
Code to Reproduce -
```
from pyspark.sql.functions import col, concat, lit, max, min, substring, desc
#boundaries contains the min and max record keys for our parquet files
boundaries=[
("202001010800_W01D000","202001011142_W08D510"),
("202001011142_W08D511","202001011527_W09D191"),
("202001011527_W09D192","202001011918_W06D060")
]
COW_TABLE_NAME="table_duplicates"
PARTITION_FIELD = "year,month"
PRECOMBINE_FIELD = "timestamp"
COW_TABLE_LOCATION="file:///tmp/issue_9870_1000"
hudi_options_opt =
{ "hoodie.table.name": COW_TABLE_NAME, "hoodie.table.type": "COPY_ON_WRITE",
"hoodie.index.type": "BLOOM", "hoodie.datasource.write.recordkey.field": "id",
"hoodie.datasource.write.partitionpath.field": PARTITION_FIELD,
"hoodie.datasource.write.precombine.field": PRECOMBINE_FIELD,
"hoodie.datasource.write.hive_style_partitioning": "true",
"hoodie.metadata.enable": "true", "hoodie.bloom.index.use.metadata": "true",
"hoodie.metadata.index.column.stats.enable": "true",
"hoodie.parquet.small.file.limit": -1 }
updates=[]
for t in boundaries:
print(t)
calculated_id=t[1][ 0 : 17 ] + "\{:03d}".format(int(t[1][ 17 : 20 ])-1)
inputDF = spark.createDataFrame(
[
(t[0], "1", t[0][ 0 : 12 ],2020,1),
(t[1], "1", t[1][ 0 : 12 ],2020,1),
(calculated_id, "1", t[1][ 0 : 12 ],2020,1)
],
["id", "value", "timestamp","year","month"]
)
updates.append((calculated_id,"2",2020,1))
inputDF.write.format("org.apache.hudi") \
.option("hoodie.datasource.write.operation", "insert") \
.options(**hudi_options_opt) \
.mode("append") \
.save(COW_TABLE_LOCATION)
#read the table
cow_table_opt = spark.read.format('org.apache.hudi') \
.options(**hudi_options_opt) \
.load(COW_TABLE_LOCATION)
#count the rows: we have 300 rows
cow_table_opt.count()
#generate a dataframe with the rows to upsert
inputDF = spark.createDataFrame(
updates,
["id", "value","year","month"]
)
#add timestamp colum
new_df=inputDF.withColumn("timestamp",substring("id", 0, 12))
(new_df.write.format("org.apache.hudi")
.option("hoodie.datasource.write.operation", "upsert")
.options(**hudi_options_opt)
.mode("append")
.save(COW_TABLE_LOCATION))
#read the new data
cow_table_opt = spark.read.format('org.apache.hudi') \
.options(**hudi_options_opt) \
.load(COW_TABLE_LOCATION)
cow_table_opt.count()
#Here we can see we have duplicates
cow_table_opt.groupBy("year","month","_hoodie_record_key").count().orderBy(desc("count")).show(100,
False)
```
> Data Duplicates with range pruning while using hoodie.bloom.index.use.metadata
> ------------------------------------------------------------------------------
>
> Key: HUDI-6946
> URL: https://issues.apache.org/jira/browse/HUDI-6946
> Project: Apache Hudi
> Issue Type: Bug
> Components: metadata, writer-core
> Affects Versions: 0.13.1, 0.12.3, 0.14.0
> Reporter: Aditya Goenka
> Priority: Blocker
> Fix For: 0.12.4, 0.14.1
>
>
> Github Issue -
> [https://github.com/apache/hudi/issues/9870]
>
> Code to Reproduce -
> ```
> from pyspark.sql.functions import col, concat, lit, max, min, substring, desc
> COW_TABLE_NAME="table_duplicates"
> PARTITION_FIELD = "year,month"
> PRECOMBINE_FIELD = "timestamp"
> COW_TABLE_LOCATION="file:///tmp/issue_9870_" + str(uuid.uuid4())
> hudi_options_opt = {
> "hoodie.table.name": COW_TABLE_NAME,
> "hoodie.table.type": "COPY_ON_WRITE",
> "hoodie.index.type": "BLOOM",
> "hoodie.datasource.write.recordkey.field": "id",
> "hoodie.datasource.write.partitionpath.field": PARTITION_FIELD,
> "hoodie.datasource.write.precombine.field": PRECOMBINE_FIELD,
> "hoodie.datasource.write.hive_style_partitioning": "true",
> "hoodie.metadata.enable": "true",
> "hoodie.bloom.index.use.metadata": "true",
> "hoodie.metadata.index.column.stats.enable": "true",
> "hoodie.parquet.small.file.limit": -1
> }
> COW_TABLE_LOCATION="file:///tmp/issue_9870_" + str(uuid.uuid4())
> inputDF = spark.createDataFrame(
> [
> ('1', "1", '1',2020,1),
> ('2', "1", '1',2020,1),
> ('3', "1", '1',2020,1)
> ],
> ["id", "value", "timestamp","year","month"]
> )
> (inputDF.write.format("org.apache.hudi")
> .option("hoodie.datasource.write.operation", "upsert")
> .options(**hudi_options_opt)
> .mode("append")
> .save(COW_TABLE_LOCATION))
> upsertDF = spark.createDataFrame(
> [
> ('3', "2", '1',2020,1)
> ],
> ["id", "value", "timestamp","year","month"]
> )
> (upsertDF.write.format("org.apache.hudi")
> .option("hoodie.datasource.write.operation", "upsert")
> .options(**hudi_options_opt)
> .mode("append")
> .save(COW_TABLE_LOCATION))
> spark.read.format('org.apache.hudi').load(COW_TABLE_LOCATION).groupBy("year","month","_hoodie_record_key").count().orderBy(desc("count")).show(100,
> False)
> ```
--
This message was sent by Atlassian Jira
(v8.20.10#820010)