rangareddy commented on issue #14995:
URL: https://github.com/apache/hudi/issues/14995#issuecomment-3659615896

   Hi, I have tested this with the latest master branch code; both bulk_insert 
and upsert modes are working as expected.
   
   ```sh
   export SPARK_VERSION=3.5
   export HUDI_VERSION=1.1.0
   export SCALA_VERSION=2.12
   
   pyspark --master "local[2]" \
     --packages 
org.apache.hudi:hudi-spark$SPARK_VERSION-bundle_$SCALA_VERSION:$HUDI_VERSION \
     --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
     --conf 
'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
 \
     --conf 
'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
     --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'
   ```
   
   **Bulk Insert with Row Writer Enabled:**
   
   ```python
   table_name = 'hudi_timestamp_test'
   target_path = f'/tmp/{table_name}/'
   
   input_df = spark.createDataFrame(
       [
           ("100", "Mo", "2015-01-01", "2015-01-01T13:51:39.345397Z"),
           ("101", "Rula", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
           ("102", "Sari", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
           ("103", "Jo", "2015-01-01", "2015-01-01T13:51:40.519832Z"),
           ("104", "Dani", "2015-01-02", "2015-01-01T12:15:00.512679Z"),
           ("105", "Abrar", "2015-01-02", "2015-01-01T13:51:42.248818Z"),
       ],
       ("id", "artist", "created", "updated"),
   )
   input_df.show(truncate=False)
   
   input_df = input_df.withColumn('updated', f.to_timestamp(f.col('updated')))
   input_df.show(truncate=False)
   
   hudi_config = {
       "hoodie.datasource.write.keygenerator.class": 
"org.apache.hudi.keygen.ComplexKeyGenerator",
       "hoodie.bulkinsert.shuffle.parallelism": 10,
       "hoodie.upsert.shuffle.parallelism": 10,
       "hoodie.datasource.write.operation": "bulk_insert",
       "hoodie.datasource.write.row.writer.enable": "true",
       "hoodie.datasource.write.recordkey.field": "id",
       "hoodie.datasource.write.precombine.field": "updated",
       "hoodie.datasource.write.partitionpath.field": "created",
       "hoodie.datasource.write.hive_style_partitioning": "true",
       "hoodie.table.name": table_name,
       "hoodie.datasource.write.table.type": "COPY_ON_WRITE",
       "hoodie.datasource.hive_sync.table": table_name,
       "hoodie.datasource.hive_sync.enable": "true",
       "hoodie.datasource.hive_sync.use_jdbc": "false",
       "hoodie.datasource.hive_sync.mode": "hms",
       "hoodie.datasource.hive_sync.support_timestamp": "true",
       "hoodie.datasource.hive_sync.metastore.uris": 
"thrift://hivemetastore:9083",
       "hoodie.datasource.hive_sync.partition_fields": "created",
   }
   
   
input_df.write.format('hudi').options(**hudi_config).mode('overwrite').save(target_path)
   
   spark.read.format('hudi').load(target_path).select("_hoodie_commit_time", 
"id", "updated").show(10, False)
   +-------------------+---+--------------------------+
   |_hoodie_commit_time|id |updated                   |
   +-------------------+---+--------------------------+
   |20251216071023609  |100|2015-01-01 13:51:39.345397|
   |20251216071023609  |101|2015-01-01 12:14:58.597216|
   |20251216071023609  |102|2015-01-01 13:51:40.417052|
   |20251216071023609  |104|2015-01-01 12:15:00.512679|
   |20251216071023609  |105|2015-01-01 13:51:42.248818|
   |20251216071023609  |103|2015-01-01 13:51:40.519832|
   +-------------------+---+--------------------------+
   ```
   
   **Upsert:**
   
   ```python
   table_name = 'hudi_upsert_timestamp_test'
   target_path = f'/tmp/{table_name}/'
   
   hudi_config = {
       "hoodie.datasource.write.keygenerator.class": 
"org.apache.hudi.keygen.ComplexKeyGenerator",
       "hoodie.bulkinsert.shuffle.parallelism": 10,
       "hoodie.upsert.shuffle.parallelism": 10,
       "hoodie.datasource.write.operation": "upsert",
       "hoodie.datasource.write.recordkey.field": "id",
       "hoodie.datasource.write.precombine.field": "updated",
       "hoodie.datasource.write.partitionpath.field": "created",
       "hoodie.datasource.write.hive_style_partitioning": "true",
       "hoodie.table.name": table_name,
       "hoodie.datasource.write.table.type": "COPY_ON_WRITE",
       "hoodie.datasource.hive_sync.table": table_name,
       "hoodie.datasource.hive_sync.enable": "true",
       "hoodie.datasource.hive_sync.use_jdbc": "false",
       "hoodie.datasource.hive_sync.mode": "hms",
       "hoodie.datasource.hive_sync.support_timestamp": "true",
       "hoodie.datasource.hive_sync.metastore.uris": 
"thrift://hivemetastore:9083",
       "hoodie.datasource.hive_sync.partition_fields": "created",
   }
   
   
input_df.write.format('hudi').options(**hudi_config).mode('overwrite').save(target_path)
   
   spark.read.format('hudi').load(target_path).select("_hoodie_commit_time", 
"id", "updated").show(10, False)
   +-------------------+---+--------------------------+
   |_hoodie_commit_time|id |updated                   |
   +-------------------+---+--------------------------+
   |20251216092512987  |100|2015-01-01 13:51:39.345397|
   |20251216092512987  |101|2015-01-01 12:14:58.597216|
   |20251216092512987  |102|2015-01-01 13:51:40.417052|
   |20251216092512987  |103|2015-01-01 13:51:40.519832|
   |20251216092512987  |104|2015-01-01 12:15:00.512679|
   |20251216092512987  |105|2015-01-01 13:51:42.248818|
   +-------------------+---+--------------------------+
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to