ms-baazi opened a new issue, #8835:
URL: https://github.com/apache/hudi/issues/8835

   **I'm getting duplicate data while upserting data to Partitioned MOR Table**
   
   A clear and concise description of the problem.
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   Executed the below code to inserted data into the new table. This performs 
as expected, no duplicates are observed.
   ```
   try:
       import os
       import sys
       import uuid
       import time
       import pyspark
       from pyspark import SparkConf, SparkContext
       from pyspark.sql import SparkSession
       from pyspark.sql.functions import col, asc, desc
       from awsglue.utils import getResolvedOptions
       from awsglue.dynamicframe import DynamicFrame
       from awsglue.context import GlueContext
   
       from faker import Faker
   
       print("All modules are loaded .....")
   
   except Exception as e:
       print("Some modules are missing {} ".format(e))
   
   database_name1 = "hudi_db"
   table_name = "mor_upsert_partitioned_20230529"
   base_s3_path = "s3://hudi-datalake-poc/hudi_data"
   final_base_path = "{base_s3_path}/{table_name}".format(
       base_s3_path=base_s3_path, table_name=table_name
   )
   
   method = 'upsert'
   table_type = "MERGE_ON_READ"
   
   global faker
   faker = Faker()
   
   
   class DataGenerator(object):
       @staticmethod
       def get_data(start, end, partition):
           return [
               (
                   x,
                   faker.name(),
                   faker.random_element(elements=('IT', 'HR', 'Sales', 
'Marketing')),
                   partition,
                   faker.random_int(min=10000, max=150000),
                   faker.random_int(min=18, max=60),
                   faker.random_int(min=0, max=100000),
                   int(time.time())
               ) for x in range(start, end)
           ]
   
   
   def create_spark_session():
       spark = SparkSession \
           .builder \
           .config('spark.serializer', 
'org.apache.spark.serializer.KryoSerializer') \
           .getOrCreate()
       return spark
   
   
   spark = create_spark_session()
   sc = spark.sparkContext
   glueContext = GlueContext(sc)
   
   hudi_options = {
       'hoodie.database.name': database_name1,
       'hoodie.table.name': table_name,
       'hoodie.datasource.write.recordkey.field': 'emp_id',
       'hoodie.datasource.write.table.name': table_name,
       'hoodie.datasource.write.operation': method,
       "hoodie.datasource.write.table.type": table_type,
       'hoodie.datasource.write.precombine.field': 'ts',
       'hoodie.upsert.shuffle.parallelism': 2,
       'hoodie.insert.shuffle.parallelism': 2,
       'hoodie.index.type': 'GLOBAL_BLOOM',
       'hoodie.bloom.index.update.partition.path' :'true',
       'hoodie.datasource.write.partitionpath.field': 'state',
       'hoodie.compact.inline.max.delta.commits': '1',
       
       'hoodie.datasource.hive_sync.enable': 'true',
       "hoodie.datasource.hive_sync.mode": "hms",
       'hoodie.datasource.hive_sync.sync_as_datasource': 'false',
       'hoodie.datasource.hive_sync.database': database_name1,
       'hoodie.datasource.hive_sync.table': table_name,
       'hoodie.datasource.hive_sync.use_jdbc': 'false',
       'hoodie.datasource.hive_sync.partition_extractor_class': 
'org.apache.hudi.hive.MultiPartKeysValueExtractor',
       'hoodie.datasource.write.hive_style_partitioning': 'true'
        }
   
   ## Clustering
   hudi_options['hoodie.clustering.plan.strategy.sort.columns'] = 'department'
   hudi_options['hoodie.cleaner.policy'] = 'KEEP_LATEST_FILE_VERSIONS'
   hudi_options['hoodie.keep.max.commits'] = '21'
   hudi_options['hoodie.clustering.plan.strategy.max.bytes.per.group'] = 
'107374182400'
   hudi_options['hoodie.clustering.plan.strategy.max.num.groups'] = '1'
   
   
   def insert_data(start, end, partition):
       # ====================================================
       """Create Spark Data Frame """
       # ====================================================
       data = DataGenerator.get_data(start, end, partition)
   
       columns = ["emp_id", "employee_name", "department", "state", "salary", 
"age", "bonus", "ts"]
       df = spark.createDataFrame(data=data, schema=columns)
       
df.write.format("hudi").options(**hudi_options).partitionBy("state").mode("append").save(final_base_path)
       print('Rows updated')
   
   ## update data
   insert_data(250, 650, 'NEWPARTITION_1')
   time.sleep(15)
   insert_data(175, 325, 'NEWPARTITION_2')
   time.sleep(15)
   ```
   
   
   1. The data already had emp_id from 1 to 2000 with following partitions - 
'CA', 'NY', 'TX', 'FL', 'IL', 'RJ'
   2. I ran the script to update partitions of few records.
   3. After executing, the code, I tried to look for any duplicates within the 
MOR rt tables. There were 75 such emp_ids with duplicates, with the minimum 
being - 250 and max being - 324
   4. The duplicate data looked like - 
   <img width="1391" alt="image" 
src="https://github.com/apache/hudi/assets/132332483/873a5716-b38e-41ec-b040-e9e6d2e12123";>
   
   
   **Expected behavior**
   
   After partition changes, data was supposed to be removed from older 
partitions and moved to new ones. I don't see those data points in older 
partitions, rather seeing duplicates within the same new partition that was 
created.
   
   **Environment Description**
   
   * Hudi version : 0.12.1
   * Spark version : 3.3
   * Storage (HDFS/S3/GCS..) : S3
   * Platform : Glue 4.0 and Athena
   * Running on Docker? (yes/no) : No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to