mzheng-plaid commented on issue #9977:
URL: https://github.com/apache/hudi/issues/9977#issuecomment-1839667223
@ad1happy2go @codope I was able to reproduce with the following Spark code
(50000 row dataset). It seems the problem is related to handling of array
fields in structs. Could you confirm if you're able to reproduce using this
code?
```
from pyspark.sql.types import StringType
from pyspark.sql import functions as F
from pyspark.sql import types as T
import uuid
from pyspark.sql import Row
import random
hudi_options = {
"hoodie.table.name": "clustering_bug_test",
"hoodie.datasource.write.recordkey.field": "id.value",
"hoodie.datasource.write.partitionpath.field": "partition:SIMPLE",
"hoodie.datasource.write.table.name": "clustering_bug_test",
"hoodie.datasource.write.table.type": "MERGE_ON_READ",
"hoodie.datasource.write.operation": "upsert",
"hoodie.datasource.write.precombine.field": "publishedAtUnixNano",
"hoodie.datasource.write.payload.class":
"org.apache.hudi.common.model.DefaultHoodieRecordPayload",
"hoodie.compaction.payload.class":
"org.apache.hudi.common.model.DefaultHoodieRecordPayload",
# Turn off small file optimizations
"hoodie.parquet.small.file.limit": "0",
# Turn off metadata table
"hoodie.metadata.enable": "false",
"hoodie.datasource.write.keygenerator.class":
"org.apache.hudi.keygen.CustomKeyGenerator",
# Hive style partitioning
"hoodie.datasource.write.hive_style_partitioning": "true",
'hoodie.cleaner.commits.retained': 1,
"hoodie.bootstrap.index.enable": "false",
'hoodie.commits.archival.batch': 5,
# Bloom filter
"hoodie.index.type": "BLOOM",
'hoodie.bloom.index.prune.by.ranges': 'false',
}
clustering_hudi_options = {
**hudi_options,
"hoodie.clustering.inline": "true",
"hoodie.clustering.inline.max.commits": 1,
"hoodie.clustering.plan.strategy.small.file.limit": 256 * 1024 * 1024,
"hoodie.clustering.plan.strategy.target.file.max.bytes": 512 * 1024 *
1024,
"hoodie.clustering.plan.strategy.sort.columns": "id.value",
"hoodie.clustering.plan.strategy.max.num.groups": 300,
}
random.seed(10)
dummy_data = [
Row(
id=Row(value=str(uuid.uuid4())),
publishedAtUnixNano=i,
partition="1",
struct_array_column=Row(
element=[str(random.randint(0, 1000000)) for i in
range(random.randint(1, 100))],
),
struct_column=Row(
nested_array_column=Row(
element=[str(random.randint(0, 1000000)) for i in
range(random.randint(1, 100))],
),
),
# This padding ensures files are large enough to reproduce the data
loss
**{
f"col_{i}": str(uuid.uuid4())
for i in range(100)
},
)
for i in range(50000)
]
df_dummy = spark.createDataFrame(dummy_data)
# This was tested in S3
PATH = f"{OUTPUT_PATH}"
df_dummy.write.format("hudi").options(**hudi_options).mode("append").save(PATH)
read_df = spark.read.format("hudi").load(PATH)
data = read_df.take(1)
init_count = read_df.count()
# This upsert should be a no-op (re-writing 1 existing row)
upsert_df = spark.createDataFrame(data, read_df.schema)
upsert_df.write.format("hudi").options(**clustering_hudi_options).mode("append").save(PATH)
read_df = spark.read.format("hudi").load(PATH)
final_count = read_df.count()
print(f"{init_count}, {final_count}")
```
The schema is:
```
root
|-- id: struct (nullable = true)
| |-- value: string (nullable = true)
|-- publishedAtUnixNano: long (nullable = true)
|-- partition: string (nullable = true)
|-- struct_array_column: struct (nullable = true)
| |-- element: array (nullable = true)
| | |-- element: string (containsNull = true)
|-- struct_column: struct (nullable = true)
| |-- nested_array_column: struct (nullable = true)
| | |-- element: array (nullable = true)
| | | |-- element: string (containsNull = true)
|-- col_0: string (nullable = true)
|-- col_1: string (nullable = true)
|-- col_2: string (nullable = true)
|-- col_3: string (nullable = true)
...
```
We expect init_count and final_count to be the same but it's actually (may
vary)
```
50000, 48000
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]