ms-baazi opened a new issue, #8835:
URL: https://github.com/apache/hudi/issues/8835
**I'm getting duplicate data while upserting data to Partitioned MOR Table**
A clear and concise description of the problem.
**To Reproduce**
Steps to reproduce the behavior:
Executed the below code to inserted data into the new table. This performs
as expected, no duplicates are observed.
```
try:
import os
import sys
import uuid
import time
import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, asc, desc
from awsglue.utils import getResolvedOptions
from awsglue.dynamicframe import DynamicFrame
from awsglue.context import GlueContext
from faker import Faker
print("All modules are loaded .....")
except Exception as e:
print("Some modules are missing {} ".format(e))
database_name1 = "hudi_db"
table_name = "mor_upsert_partitioned_20230529"
base_s3_path = "s3://hudi-datalake-poc/hudi_data"
final_base_path = "{base_s3_path}/{table_name}".format(
base_s3_path=base_s3_path, table_name=table_name
)
method = 'upsert'
table_type = "MERGE_ON_READ"
global faker
faker = Faker()
class DataGenerator(object):
@staticmethod
def get_data(start, end, partition):
return [
(
x,
faker.name(),
faker.random_element(elements=('IT', 'HR', 'Sales',
'Marketing')),
partition,
faker.random_int(min=10000, max=150000),
faker.random_int(min=18, max=60),
faker.random_int(min=0, max=100000),
int(time.time())
) for x in range(start, end)
]
def create_spark_session():
spark = SparkSession \
.builder \
.config('spark.serializer',
'org.apache.spark.serializer.KryoSerializer') \
.getOrCreate()
return spark
spark = create_spark_session()
sc = spark.sparkContext
glueContext = GlueContext(sc)
hudi_options = {
'hoodie.database.name': database_name1,
'hoodie.table.name': table_name,
'hoodie.datasource.write.recordkey.field': 'emp_id',
'hoodie.datasource.write.table.name': table_name,
'hoodie.datasource.write.operation': method,
"hoodie.datasource.write.table.type": table_type,
'hoodie.datasource.write.precombine.field': 'ts',
'hoodie.upsert.shuffle.parallelism': 2,
'hoodie.insert.shuffle.parallelism': 2,
'hoodie.index.type': 'GLOBAL_BLOOM',
'hoodie.bloom.index.update.partition.path' :'true',
'hoodie.datasource.write.partitionpath.field': 'state',
'hoodie.compact.inline.max.delta.commits': '1',
'hoodie.datasource.hive_sync.enable': 'true',
"hoodie.datasource.hive_sync.mode": "hms",
'hoodie.datasource.hive_sync.sync_as_datasource': 'false',
'hoodie.datasource.hive_sync.database': database_name1,
'hoodie.datasource.hive_sync.table': table_name,
'hoodie.datasource.hive_sync.use_jdbc': 'false',
'hoodie.datasource.hive_sync.partition_extractor_class':
'org.apache.hudi.hive.MultiPartKeysValueExtractor',
'hoodie.datasource.write.hive_style_partitioning': 'true'
}
## Clustering
hudi_options['hoodie.clustering.plan.strategy.sort.columns'] = 'department'
hudi_options['hoodie.cleaner.policy'] = 'KEEP_LATEST_FILE_VERSIONS'
hudi_options['hoodie.keep.max.commits'] = '21'
hudi_options['hoodie.clustering.plan.strategy.max.bytes.per.group'] =
'107374182400'
hudi_options['hoodie.clustering.plan.strategy.max.num.groups'] = '1'
def insert_data(start, end, partition):
# ====================================================
"""Create Spark Data Frame """
# ====================================================
data = DataGenerator.get_data(start, end, partition)
columns = ["emp_id", "employee_name", "department", "state", "salary",
"age", "bonus", "ts"]
df = spark.createDataFrame(data=data, schema=columns)
df.write.format("hudi").options(**hudi_options).partitionBy("state").mode("append").save(final_base_path)
print('Rows updated')
## update data
insert_data(250, 650, 'NEWPARTITION_1')
time.sleep(15)
insert_data(175, 325, 'NEWPARTITION_2')
time.sleep(15)
```
1. The data already had emp_id from 1 to 2000 with following partitions -
'CA', 'NY', 'TX', 'FL', 'IL', 'RJ'
2. I ran the script to update partitions of few records.
3. After executing, the code, I tried to look for any duplicates within the
MOR rt tables. There were 75 such emp_ids with duplicates, with the minimum
being - 250 and max being - 324
4. The duplicate data looked like -
<img width="1391" alt="image"
src="https://github.com/apache/hudi/assets/132332483/873a5716-b38e-41ec-b040-e9e6d2e12123">
**Expected behavior**
After partition changes, data was supposed to be removed from older
partitions and moved to new ones. I don't see those data points in older
partitions, rather seeing duplicates within the same new partition that was
created.
**Environment Description**
* Hudi version : 0.12.1
* Spark version : 3.3
* Storage (HDFS/S3/GCS..) : S3
* Platform : Glue 4.0 and Athena
* Running on Docker? (yes/no) : No
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]