abhisheksp opened a new issue, #12128:
URL: https://github.com/apache/hudi/issues/12128
**Describe the problem you faced**
I am observing duplicate records across partitions while using GLOBAL_BLOOM
index. However, the duplicates dont appear with GLOBAL_SIMPLE index
Steps to reproduce the behavior:
1. Run spark shell
```
spark-shell --packages org.apache.hudi:hudi-spark3.5-bundle_2.12:0.15.0 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf
'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
\
--conf
'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'
```
2. Run the Global Bloom Index Code
3. Observe the final output and find duplicate records for record id =
1(John)
4. Run the Global Simple Index code and observe that there are no duplicates
### Code to reproduce the issue with GLOBAL_BLOOM index
```
import org.apache.spark.sql.{SaveMode, SparkSession}
val basePath = "/Volumes/workplace/adhoc-work/some_hudi_path"
val tableName = "my_hudi_table"
val recordKeyField = "id"
val preCombinedField = "last_update_time"
val partitionPathField = "country,year"
val session =
SparkSession.builder().appName("app_name").master("local").getOrCreate()
// Clear the data at basePath if it exists
if (session.read.format("hudi")
.option("hoodie.table.name", tableName)
.option("hoodie.datasource.write.recordkey.field", recordKeyField)
.load(basePath)
.count() > 0) {
session.read.format("hudi")
.option("hoodie.table.name", tableName)
.option("hoodie.datasource.write.recordkey.field", recordKeyField)
.load(basePath)
.write.format("hudi")
.option("hoodie.table.name", tableName)
.option("hoodie.datasource.write.recordkey.field", recordKeyField)
.mode(SaveMode.Overwrite)
.save(basePath)
}
val initialData = Seq(
(1, "John", "USA", 2023, "2015-01-01T13:51:39.340396Z"),
(2, "Alice", "Canada", 2023, "2015-01-02T13:51:39.340396Z"),
(3, "Hulk", "UK", 2023, "2015-01-02T13:51:39.340396Z")
).toDF("id", "name", "country", "year", "last_update_time")
initialData.write.format("hudi")
.option("hoodie.table.name", tableName)
.option("hoodie.datasource.write.recordkey.field", recordKeyField)
.option("hoodie.datasource.write.partitionpath.field", partitionPathField)
.option("hoodie.datasource.write.precombine.field", preCombinedField)
.option("hoodie.datasource.write.table.name", tableName)
.option("hoodie.datasource.write.operation", "upsert")
.option("hoodie.bloom.index.type", "GLOBAL_BLOOM")
.option("hoodie.bloom.index.update.partition.path", "true")
.mode(SaveMode.Append)
.save(basePath)
println("Data before partition-changing update:")
session.read.format("hudi")
.option(QUERY_TYPE.key, QUERY_TYPE_SNAPSHOT_OPT_VAL)
.option("hoodie.table.name", tableName)
.option("hoodie.datasource.write.recordkey.field", recordKeyField)
.load(basePath)
.show()
/*
+-------------------+--------------------+------------------+----------------------+--------------------+---+-----+--------------------+-------+----+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
_hoodie_file_name| id| name| last_update_time|country|year|
+-------------------+--------------------+------------------+----------------------+--------------------+---+-----+--------------------+-------+----+
| 20241018141559079|20241018141559079...| 2|
Canada/2023|c8045504-c666-4a6...| 2|Alice|2015-01-02T13:51:...| Canada|2023|
| 20241018141559079|20241018141559079...| 1|
USA/2023|deed664c-faf5-4ab...| 1| John|2015-01-01T13:51:...| USA|2023|
| 20241018141559079|20241018141559079...| 3|
UK/2023|9d44bfff-2cbc-44b...| 3| Hulk|2015-01-02T13:51:...| UK|2023|
+-------------------+--------------------+------------------+----------------------+--------------------+---+-----+--------------------+-------+----+
*/
// Updated data with partition change (country USA -> UK)
val updatedData = Seq(
(1, "John", "UK", 2023, "2015-01-03T13:51:39.340396Z") // Changed country
from USA to UK
).toDF("id", "name", "country", "year", "last_update_time")
// Perform the partition-changing update with global Bloom index
updatedData.write.format("hudi")
.option("hoodie.table.name", tableName)
.option("hoodie.datasource.write.recordkey.field", recordKeyField)
.option("hoodie.datasource.write.partitionpath.field", partitionPathField)
.option("hoodie.datasource.write.table.name", tableName)
.option("hoodie.datasource.write.operation", "upsert")
.option("hoodie.datasource.write.precombine.field", preCombinedField)
.option("hoodie.bloom.index.type", "GLOBAL_BLOOM")
.option("hoodie.bloom.index.update.partition.path", "true")
.mode(SaveMode.Append)
.save(basePath)
println("Data after partition-changing update:")
session.read.format("hudi")
.option(QUERY_TYPE.key, QUERY_TYPE_SNAPSHOT_OPT_VAL)
.option("hoodie.table.name", tableName)
.option("hoodie.datasource.write.recordkey.field", recordKeyField)
.load(basePath)
.show()
/*
+-------------------+--------------------+------------------+----------------------+--------------------+---+-----+--------------------+-------+----+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
_hoodie_file_name| id| name| last_update_time|country|year|
+-------------------+--------------------+------------------+----------------------+--------------------+---+-----+--------------------+-------+----+
| 20241018141559079|20241018141559079...| 3|
UK/2023|9d44bfff-2cbc-44b...| 3| Hulk|2015-01-02T13:51:...| UK|2023|
| 20241018141639952|20241018141639952...| 1|
UK/2023|9d44bfff-2cbc-44b...| 1| John|2015-01-03T13:51:...| UK|2023|
| 20241018141559079|20241018141559079...| 2|
Canada/2023|c8045504-c666-4a6...| 2|Alice|2015-01-02T13:51:...| Canada|2023|
| 20241018141559079|20241018141559079...| 1|
USA/2023|deed664c-faf5-4ab...| 1| John|2015-01-01T13:51:...| USA|2023|
+-------------------+--------------------+------------------+----------------------+--------------------+---+-----+--------------------+-------+----+
*/
```
### Code to demonstrate the correct expected behavior using GLOBAL_SIMPLE
index
```
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieIndexConfig
import org.apache.hudi.index.HoodieIndex
val basePath = "/Volumes/workplace/adhoc-work/some_hudi_path"
val tableName = "my_hudi_table"
val recordKeyField = "id"
val preCombinedField = "last_update_time"
val partitionPathField = "country,year"
val session =
SparkSession.builder().appName("app_name").master("local").getOrCreate()
// Clear existing data (if any)
session.sparkContext.hadoopConfiguration.set("fs.defaultFS", "file:///")
val fs =
org.apache.hadoop.fs.FileSystem.get(session.sparkContext.hadoopConfiguration)
fs.delete(new org.apache.hadoop.fs.Path(basePath), true)
val initialData = Seq(
(1, "John", "USA", 2023, "2015-01-01T13:51:39.340396Z"),
(2, "Alice", "Canada", 2023, "2015-01-02T13:51:39.340396Z"),
(3, "Hulk", "UK", 2023, "2015-01-02T13:51:39.340396Z")
).toDF("id", "name", "country", "year", "last_update_time")
initialData.write.format("hudi")
.option("hoodie.table.name", tableName)
.option(PRECOMBINE_FIELD.key, preCombinedField)
.option(RECORDKEY_FIELD.key, recordKeyField)
.option(PARTITIONPATH_FIELD.key, partitionPathField)
.option(INSERT_DROP_DUPS.key, "false")
.option(HoodieIndexConfig.INDEX_TYPE.key,
HoodieIndex.IndexType.GLOBAL_SIMPLE.name())
.option("hoodie.global.simple.index.partition.path.enable", "true")
.option("hoodie.insert.shuffle.parallelism", "2")
.option("hoodie.upsert.shuffle.parallelism", "2")
.mode(SaveMode.Overwrite)
.save(basePath)
println("Data before partition-changing update:")
session.read.format("hudi")
.load(basePath)
.show()
// Updated data with partition change (country USA -> UK)
val updatedData = Seq(
(1, "John", "UK", 2023, "2015-01-03T13:51:39.340396Z") // Changed country
from USA to UK
).toDF("id", "name", "country", "year", "last_update_time")
// Perform the partition-changing update with GLOBAL_SIMPLE Index
updatedData.write.format("hudi")
.option("hoodie.table.name", tableName)
.option(PRECOMBINE_FIELD.key, preCombinedField)
.option(RECORDKEY_FIELD.key, recordKeyField)
.option(PARTITIONPATH_FIELD.key, partitionPathField)
.option(OPERATION.key, UPSERT_OPERATION_OPT_VAL)
.option(HoodieIndexConfig.INDEX_TYPE.key,
HoodieIndex.IndexType.GLOBAL_SIMPLE.name())
.option("hoodie.global.simple.index.partition.path.enable", "true")
.option("hoodie.insert.shuffle.parallelism", "2")
.option("hoodie.upsert.shuffle.parallelism", "2")
.mode(SaveMode.Append)
.save(basePath)
println("Data after partition-changing update:")
session.read.format("hudi")
.load(basePath)
.show()
```
**Expected behavior**
There should be no duplicate records when I use GLOBAL_BLOOM index
**Environment Description**
* Hudi version : 0.15
* Spark version : 3.5.0
* Hive version : Not Applicable
* Hadoop version : Not Applicable
* Storage (HDFS/S3/GCS..) : Not Applicable
* Running on Docker? (yes/no) : no
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]