mzheng-plaid commented on issue #9977:
URL: https://github.com/apache/hudi/issues/9977#issuecomment-1839667223

   @ad1happy2go @codope I was able to reproduce with the following Spark code 
(50000 row dataset). It seems the problem is related to handling of array 
fields in structs. Could you confirm if you're able to reproduce using this 
code?
   
   ```
   from pyspark.sql.types import StringType
   from pyspark.sql import functions as F
   from pyspark.sql import types as T
   
   import uuid
   from pyspark.sql import Row
   import random
   
   
   hudi_options = {
       "hoodie.table.name": "clustering_bug_test",
       "hoodie.datasource.write.recordkey.field": "id.value",
       "hoodie.datasource.write.partitionpath.field": "partition:SIMPLE",
       "hoodie.datasource.write.table.name": "clustering_bug_test",
       "hoodie.datasource.write.table.type": "MERGE_ON_READ",
       "hoodie.datasource.write.operation": "upsert",
       "hoodie.datasource.write.precombine.field": "publishedAtUnixNano",
       "hoodie.datasource.write.payload.class": 
"org.apache.hudi.common.model.DefaultHoodieRecordPayload",
       "hoodie.compaction.payload.class": 
"org.apache.hudi.common.model.DefaultHoodieRecordPayload",
       # Turn off small file optimizations
       "hoodie.parquet.small.file.limit": "0",
       # Turn off metadata table
       "hoodie.metadata.enable": "false",
       "hoodie.datasource.write.keygenerator.class": 
"org.apache.hudi.keygen.CustomKeyGenerator",
       # Hive style partitioning
       "hoodie.datasource.write.hive_style_partitioning": "true",
       'hoodie.cleaner.commits.retained': 1,
       "hoodie.bootstrap.index.enable": "false",
       'hoodie.commits.archival.batch': 5,
       # Bloom filter
       "hoodie.index.type": "BLOOM",
       'hoodie.bloom.index.prune.by.ranges': 'false', 
   }
   clustering_hudi_options = {
       **hudi_options,
       "hoodie.clustering.inline": "true",
       "hoodie.clustering.inline.max.commits": 1,
       "hoodie.clustering.plan.strategy.small.file.limit": 256 * 1024 * 1024,
       "hoodie.clustering.plan.strategy.target.file.max.bytes": 512 * 1024 * 
1024,
       "hoodie.clustering.plan.strategy.sort.columns": "id.value",
       "hoodie.clustering.plan.strategy.max.num.groups": 300,
   }
   
   random.seed(10)
   dummy_data = [
       Row(
           id=Row(value=str(uuid.uuid4())),
           publishedAtUnixNano=i,
           partition="1",
           struct_array_column=Row(
               element=[str(random.randint(0, 1000000)) for i in 
range(random.randint(1, 100))],
           ),
           struct_column=Row(
               nested_array_column=Row(
                   element=[str(random.randint(0, 1000000)) for i in 
range(random.randint(1, 100))],
               ),
           ),
           # This padding ensures files are large enough to reproduce the data 
loss
           **{
               f"col_{i}": str(uuid.uuid4())
               for i in range(100)
           },
       )
       for i in range(50000)
   ]
   df_dummy = spark.createDataFrame(dummy_data)
   
   # This was tested in S3
   PATH = f"{OUTPUT_PATH}"
   
df_dummy.write.format("hudi").options(**hudi_options).mode("append").save(PATH)
   
   read_df = spark.read.format("hudi").load(PATH)
   data = read_df.take(1)
   init_count = read_df.count()
   
   # This upsert should be a no-op (re-writing 1 existing row)
   upsert_df = spark.createDataFrame(data, read_df.schema)
   
upsert_df.write.format("hudi").options(**clustering_hudi_options).mode("append").save(PATH)
   
   read_df = spark.read.format("hudi").load(PATH)
   final_count = read_df.count()
   print(f"{init_count}, {final_count}")
   ```
   
   The schema is:
   ```
   root
    |-- id: struct (nullable = true)
    |    |-- value: string (nullable = true)
    |-- publishedAtUnixNano: long (nullable = true)
    |-- partition: string (nullable = true)
    |-- struct_array_column: struct (nullable = true)
    |    |-- element: array (nullable = true)
    |    |    |-- element: string (containsNull = true)
    |-- struct_column: struct (nullable = true)
    |    |-- nested_array_column: struct (nullable = true)
    |    |    |-- element: array (nullable = true)
    |    |    |    |-- element: string (containsNull = true)
    |-- col_0: string (nullable = true)
    |-- col_1: string (nullable = true)
    |-- col_2: string (nullable = true)
    |-- col_3: string (nullable = true)
   ...
   ```
   
   We expect init_count and final_count to be the same but it's actually (may 
vary)
   ```
   50000, 48000
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to