abhisheksp opened a new issue, #12128:
URL: https://github.com/apache/hudi/issues/12128

   **Describe the problem you faced**
   
   I am observing duplicate records across partitions while using GLOBAL_BLOOM 
index. However, the duplicates dont appear with GLOBAL_SIMPLE index
   
   
   
   Steps to reproduce the behavior:
   
   1. Run spark shell
   ```
    spark-shell --packages org.apache.hudi:hudi-spark3.5-bundle_2.12:0.15.0 \
   --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
   --conf 
'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
 \
   --conf 
'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
   --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'
   ```
   2.  Run the Global Bloom Index Code
   3. Observe the final output and find duplicate records for record id = 
1(John)
   4. Run the Global Simple Index code and observe that there are no duplicates
   
   ### Code to reproduce the issue with GLOBAL_BLOOM index
   
   ```
   import org.apache.spark.sql.{SaveMode, SparkSession}
   
   
   val basePath = "/Volumes/workplace/adhoc-work/some_hudi_path"
   val tableName = "my_hudi_table"
   val recordKeyField = "id"
   val preCombinedField = "last_update_time"
   val partitionPathField = "country,year"
   
   val session = 
SparkSession.builder().appName("app_name").master("local").getOrCreate()
   
   
   // Clear the data at basePath if it exists
   if (session.read.format("hudi")
     .option("hoodie.table.name", tableName)
     .option("hoodie.datasource.write.recordkey.field", recordKeyField)
     .load(basePath)
     .count() > 0) {
     session.read.format("hudi")
       .option("hoodie.table.name", tableName)
       .option("hoodie.datasource.write.recordkey.field", recordKeyField)
       .load(basePath)
       .write.format("hudi")
       .option("hoodie.table.name", tableName)
       .option("hoodie.datasource.write.recordkey.field", recordKeyField)
       .mode(SaveMode.Overwrite)
       .save(basePath)
   }
   
   
   val initialData = Seq(
     (1, "John", "USA", 2023, "2015-01-01T13:51:39.340396Z"),
     (2, "Alice", "Canada", 2023, "2015-01-02T13:51:39.340396Z"),
     (3, "Hulk", "UK", 2023, "2015-01-02T13:51:39.340396Z")
   ).toDF("id", "name", "country", "year", "last_update_time")
   
   
   initialData.write.format("hudi")
     .option("hoodie.table.name", tableName)
     .option("hoodie.datasource.write.recordkey.field", recordKeyField)
     .option("hoodie.datasource.write.partitionpath.field", partitionPathField)
     .option("hoodie.datasource.write.precombine.field", preCombinedField)
     .option("hoodie.datasource.write.table.name", tableName)
     .option("hoodie.datasource.write.operation", "upsert")
     .option("hoodie.bloom.index.type", "GLOBAL_BLOOM")
     .option("hoodie.bloom.index.update.partition.path", "true")
     .mode(SaveMode.Append)
     .save(basePath)
   
   println("Data before partition-changing update:")
   session.read.format("hudi")
     .option(QUERY_TYPE.key, QUERY_TYPE_SNAPSHOT_OPT_VAL)
     .option("hoodie.table.name", tableName)
     .option("hoodie.datasource.write.recordkey.field", recordKeyField)
     .load(basePath)
     .show()
   
   /*
   
+-------------------+--------------------+------------------+----------------------+--------------------+---+-----+--------------------+-------+----+
   
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
   _hoodie_file_name| id| name|    last_update_time|country|year|
   
+-------------------+--------------------+------------------+----------------------+--------------------+---+-----+--------------------+-------+----+
   |  20241018141559079|20241018141559079...|                 2|           
Canada/2023|c8045504-c666-4a6...|  2|Alice|2015-01-02T13:51:...| Canada|2023|
   |  20241018141559079|20241018141559079...|                 1|              
USA/2023|deed664c-faf5-4ab...|  1| John|2015-01-01T13:51:...|    USA|2023|
   |  20241018141559079|20241018141559079...|                 3|               
UK/2023|9d44bfff-2cbc-44b...|  3| Hulk|2015-01-02T13:51:...|     UK|2023|
   
+-------------------+--------------------+------------------+----------------------+--------------------+---+-----+--------------------+-------+----+
    */
   
   
   // Updated data with partition change (country USA -> UK)
   val updatedData = Seq(
     (1, "John", "UK", 2023, "2015-01-03T13:51:39.340396Z") // Changed country 
from USA to UK
   ).toDF("id", "name", "country", "year", "last_update_time")
   
   // Perform the partition-changing update with global Bloom index
   updatedData.write.format("hudi")
     .option("hoodie.table.name", tableName)
     .option("hoodie.datasource.write.recordkey.field", recordKeyField)
     .option("hoodie.datasource.write.partitionpath.field", partitionPathField)
     .option("hoodie.datasource.write.table.name", tableName)
     .option("hoodie.datasource.write.operation", "upsert")
     .option("hoodie.datasource.write.precombine.field", preCombinedField)
     .option("hoodie.bloom.index.type", "GLOBAL_BLOOM")
     .option("hoodie.bloom.index.update.partition.path", "true")
     .mode(SaveMode.Append)
     .save(basePath)
   
   
   println("Data after partition-changing update:")
   session.read.format("hudi")
     .option(QUERY_TYPE.key, QUERY_TYPE_SNAPSHOT_OPT_VAL)
     .option("hoodie.table.name", tableName)
     .option("hoodie.datasource.write.recordkey.field", recordKeyField)
     .load(basePath)
     .show()
   
   /*
   
+-------------------+--------------------+------------------+----------------------+--------------------+---+-----+--------------------+-------+----+
   
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
   _hoodie_file_name| id| name|    last_update_time|country|year|
   
+-------------------+--------------------+------------------+----------------------+--------------------+---+-----+--------------------+-------+----+
   |  20241018141559079|20241018141559079...|                 3|               
UK/2023|9d44bfff-2cbc-44b...|  3| Hulk|2015-01-02T13:51:...|     UK|2023|
   |  20241018141639952|20241018141639952...|                 1|               
UK/2023|9d44bfff-2cbc-44b...|  1| John|2015-01-03T13:51:...|     UK|2023|
   |  20241018141559079|20241018141559079...|                 2|           
Canada/2023|c8045504-c666-4a6...|  2|Alice|2015-01-02T13:51:...| Canada|2023|
   |  20241018141559079|20241018141559079...|                 1|              
USA/2023|deed664c-faf5-4ab...|  1| John|2015-01-01T13:51:...|    USA|2023|
   
+-------------------+--------------------+------------------+----------------------+--------------------+---+-----+--------------------+-------+----+
    */
   ```
   
   
   ### Code to demonstrate the correct expected behavior using GLOBAL_SIMPLE 
index
   
   ```
   import org.apache.spark.sql.{SaveMode, SparkSession}
   import org.apache.hudi.DataSourceReadOptions._
   import org.apache.hudi.DataSourceWriteOptions._
   import org.apache.hudi.config.HoodieIndexConfig
   import org.apache.hudi.index.HoodieIndex
   
   val basePath = "/Volumes/workplace/adhoc-work/some_hudi_path"
   val tableName = "my_hudi_table"
   val recordKeyField = "id"
   val preCombinedField = "last_update_time"
   val partitionPathField = "country,year"
   
   val session = 
SparkSession.builder().appName("app_name").master("local").getOrCreate()
   
   // Clear existing data (if any)
   session.sparkContext.hadoopConfiguration.set("fs.defaultFS", "file:///")
   val fs = 
org.apache.hadoop.fs.FileSystem.get(session.sparkContext.hadoopConfiguration)
   fs.delete(new org.apache.hadoop.fs.Path(basePath), true)
   
   val initialData = Seq(
     (1, "John", "USA", 2023, "2015-01-01T13:51:39.340396Z"),
     (2, "Alice", "Canada", 2023, "2015-01-02T13:51:39.340396Z"),
     (3, "Hulk", "UK", 2023, "2015-01-02T13:51:39.340396Z")
   ).toDF("id", "name", "country", "year", "last_update_time")
   
   initialData.write.format("hudi")
     .option("hoodie.table.name", tableName)
     .option(PRECOMBINE_FIELD.key, preCombinedField)
     .option(RECORDKEY_FIELD.key, recordKeyField)
     .option(PARTITIONPATH_FIELD.key, partitionPathField)
     .option(INSERT_DROP_DUPS.key, "false")
     .option(HoodieIndexConfig.INDEX_TYPE.key, 
HoodieIndex.IndexType.GLOBAL_SIMPLE.name())
     .option("hoodie.global.simple.index.partition.path.enable", "true")
     .option("hoodie.insert.shuffle.parallelism", "2")
     .option("hoodie.upsert.shuffle.parallelism", "2")
     .mode(SaveMode.Overwrite)
     .save(basePath)
   
   println("Data before partition-changing update:")
   session.read.format("hudi")
     .load(basePath)
     .show()
   
   // Updated data with partition change (country USA -> UK)
   val updatedData = Seq(
     (1, "John", "UK", 2023, "2015-01-03T13:51:39.340396Z") // Changed country 
from USA to UK
   ).toDF("id", "name", "country", "year", "last_update_time")
   
   // Perform the partition-changing update with GLOBAL_SIMPLE Index
   updatedData.write.format("hudi")
     .option("hoodie.table.name", tableName)
     .option(PRECOMBINE_FIELD.key, preCombinedField)
     .option(RECORDKEY_FIELD.key, recordKeyField)
     .option(PARTITIONPATH_FIELD.key, partitionPathField)
     .option(OPERATION.key, UPSERT_OPERATION_OPT_VAL)
     .option(HoodieIndexConfig.INDEX_TYPE.key, 
HoodieIndex.IndexType.GLOBAL_SIMPLE.name())
     .option("hoodie.global.simple.index.partition.path.enable", "true")
     .option("hoodie.insert.shuffle.parallelism", "2")
     .option("hoodie.upsert.shuffle.parallelism", "2")
     .mode(SaveMode.Append)
     .save(basePath)
   
   println("Data after partition-changing update:")
   session.read.format("hudi")
     .load(basePath)
     .show()
   
   ```
   
   **Expected behavior**
   
   There should be no duplicate records when I use GLOBAL_BLOOM index
   
   **Environment Description**
   
   * Hudi version : 0.15
   
   * Spark version : 3.5.0
   
   * Hive version : Not Applicable
   
   * Hadoop version : Not Applicable
   
   * Storage (HDFS/S3/GCS..) : Not Applicable
   
   * Running on Docker? (yes/no) : no
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to