rangareddy commented on issue #11918:
URL: https://github.com/apache/hudi/issues/11918#issuecomment-2360358299
Hi @robertchinezon
I attempted to reproduce the aforementioned issue using the following code,
but was unable to replicate it. Could you please verify if you can
reproduce the issue in your cluster by replacing the S3 bucket name in
`table_path`?
```python
table_name= "hudi_test_11918_s3"
glue_database= "default"
record_id= "id"
partition_col_nm = "partition_column"
precomb_key = "name"
table_path = f"<s3_bucket_name>/{table_name}"
hudi_options = {
"hoodie.table.name": table_name,
"hoodie.datasource.write.table.type": "COPY_ON_WRITE",
"hoodie.bloom.index.use.metadata": "true",
"hoodie.datasource.write.operation": "upsert",
"hoodie.datasource.write.recordkey.field": record_id,
"hoodie.datasource.write.precombine.field": precomb_key,
"hoodie.datasource.write.schema.allow.auto.evolution.column.drop":
"false",
"hoodie.datasource.write.reconcile.schema": "true",
"hoodie.datasource.write.payload.class":
"org.apache.hudi.common.model.DefaultHoodieRecordPayload",
"hoodie.index.type": "BLOOM",
"hoodie.parquet.compression.codec": "gzip",
"hoodie.datasource.hive_sync.enable": "true",
"hoodie.datasource.hive_sync.database": glue_database,
"hoodie.datasource.hive_sync.table": table_name,
"hoodie.datasource.hive_sync.partition_extractor_class":
"org.apache.hudi.hive.MultiPartKeysValueExtractor",
"hoodie.datasource.hive_sync.use_jdbc": "false",
"hoodie.datasource.hive_sync.mode": "hms",
"hoodie.clean.automatic": "true",
"hoodie.clean.async": "false",
"hoodie.cleaner.policy": "KEEP_LATEST_FILE_VERSIONS",
"hoodie.cleaner.fileversions.retained": "3",
"hoodie.cleaner.commits.retained": "5",
"hoodie.datasource.write.keygenerator.class":
"org.apache.hudi.keygen.ComplexKeyGenerator",
"hoodie.datasource.write.partitionpath.field": partition_col_nm,
"hoodie.datasource.hive_sync.partition_fields": partition_col_nm,
"hoodie.datasource.write.hive_style_partitioning": "true",
"hoodie.parquet.max.file.size": f"{512 * 1024 * 1024}", # 512MB
"hoodie.parquet.small.file.limit": "104857600", # 100MB
"hoodie.metadata.record.index.enable": "false",
"hoodie.metadata.enable": "true",
"hoodie.metadata.index.async": "false",
"hoodie.metadata.index.column.stats.enable": "true",
"hoodie.enable.data.skipping": "true",
"hoodie.metadata.index.check.timeout.seconds": "900",
"hoodie.write.concurrency.mode": "single_writer",
"hoodie.markers.timeline_server_based.batch.interval_ms": "100",
"hoodie.filesystem.view.remote.retry.enable": "true",
"hoodie.record.index.update.partition.path": "true",
"hoodie.metadata.record.index.min.filegroup.count": "10"
}
from pyspark.sql.types import StructType, StructField, StringType,
IntegerType
import random
hudi_schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("partition_column", StringType(), True),
StructField("data", StringType(), True)
])
iterations = 35
records_per_iteration = 4
num_partitions = 4
for i in range(iterations):
batch_data = []
for row in range(records_per_iteration):
id = i * records_per_iteration + row + 1
name = f"Record {id}"
partition_column =
f"partition_value_{random.randrange(num_partitions) + 1}"
data = f"data_{id}"
batch_data.append({"id": id, "name": name,
"partition_column":partition_column, "data": data})
print(f"Inserting the Batch {i+1} data")
df = spark.createDataFrame(data=batch_data, schema=hudi_schema)
if i == 0:
df.write.format("hudi").options(**hudi_options).mode("overwrite").save(table_path)
else:
df.write.format("hudi").options(**hudi_options).mode("append").save(table_path)
hudi_df = spark.read.format("hudi").load(table_path)
print(f"Total Records: {hudi_df.count()}")
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]