soumilshah1995 opened a new issue, #9210:
URL: https://github.com/apache/hudi/issues/9210

   I am attempting to build a feature where i was trying to compact 
YYYY/MM/DD/HH into YYYY/MM/DD
   
   i almost have the logic and i am stuck where hudi is not allowing me to 
overwrite partition path
   
   here is sample code 
   
   # Step 1 Create Hudi table with YYYY/MM/DD/HH
   ```
   import os
   import sys
   import uuid
   
   import pyspark
   from pyspark.sql import SparkSession
   from pyspark import SparkConf, SparkContext
   from pyspark.sql.functions import col, asc, desc
   from pyspark.sql.functions import col, to_timestamp, 
monotonically_increasing_id, to_date, when
   from pyspark.sql.functions import *
   from pyspark.sql.types import *
   from datetime import datetime
   from functools import reduce
   from faker import Faker
   
   SUBMIT_ARGS = "--packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.1 
pyspark-shell"
   os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
   os.environ['PYSPARK_PYTHON'] = sys.executable
   os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
   
   spark = SparkSession.builder \
       .config('spark.serializer', 
'org.apache.spark.serializer.KryoSerializer') \
       .config('className', 'org.apache.hudi') \
       .config('spark.sql.hive.convertMetastoreParquet', 'false') \
       .getOrCreate()
   
   db_name = "hudidb"
   table_name = "hudi_table"
   recordkey = 'uuid'
   precombine = 'uuid'
   path = f"file:///C:/tmp/{db_name}/{table_name}"
   method = 'upsert'
   table_type = "COPY_ON_WRITE"  # COPY_ON_WRITE | MERGE_ON_READ
   
   hudi_options = {
       'hoodie.table.name': table_name,
       'hoodie.datasource.write.recordkey.field': recordkey,
       'hoodie.datasource.write.table.name': table_name,
       'hoodie.datasource.write.operation': method,
       'hoodie.datasource.write.precombine.field': precombine,
       'hoodie.datasource.write.partitionpath.field': 'year,month,day,hour',
   }
   
   data_items = [
       (1, "mess 1", 111, "2021", "01", "01", "3"),
       (2, "mes 2", 22, "2021", "01", "01", "4"),
   ]
   
   columns = ["uuid", "message", "precomb", "year", "month", "day", "hour"]
   
   spark_df = spark.createDataFrame(data=data_items, schema=columns)
   spark_df.show()
   spark_df.printSchema()
   spark_df.write.format("hudi"). \
       options(**hudi_options). \
       mode("append"). \
       save(path)
   
   ```
   
   # Step 2: Logic 
   * find partition
   * use insert_overwrite 
   * delete parition
   
   ```
   
   db_name = "hudidb"
   table_name = "hudi_table"
   recordkey = 'uuid'
   precombine = 'uuid'
   partition_compaction = "2021/01/01"
   hudi_path = f"file:///C:/tmp/{db_name}/{table_name}/{partition_compaction}"
   
   hour_partitions = spark.read.format("hudi") \
       .load(hudi_path)
   
   cleansed_columns = [col for col in hour_partitions.columns if "_hoodie" not 
in col]
   hour_partitions_cleansed = hour_partitions.select(cleansed_columns)
   hour_partitions_cleansed.show()
   
   
   ```
   
   ```
   
   def write_hudi_table(db_name,
                        table_name,
                        recordkey,
                        precombine,
                        partition_column,
                        spark_df,
                        method='upsert',
                        ):
       try:
           hudi_options = {
               'hoodie.table.name': table_name,
               'hoodie.datasource.write.recordkey.field': recordkey,
               'hoodie.datasource.write.table.name': table_name,
               'hoodie.datasource.write.operation': method,
               'hoodie.datasource.write.precombine.field': precombine,
               'hoodie.datasource.write.partitionpath.field': partition_column,
           }
           path = f"file:///C:/tmp/{db_name}/{table_name}"
   
           spark_df.write.format("hudi") \
               .options(**hudi_options) \
               .mode("append") \
               .save(path)
       except Exception as e:
           print("e", e)
   ```
    
   ## When calling Insert_overwrite 
   
   ```
    write_hudi_table(
           db_name="hudidb",
           table_name="hudi_table",
           recordkey="uuid",
           precombine="date",
           partition_column="date",
           spark_df=hour_partitions_cleansed,
           method='insert_overwrite',
       )
   
   ```
   # Error
   ```
   e An error occurred while calling o73.save.
   : org.apache.hudi.exception.HoodieException: Config conflict(key     current 
value   existing value):
   PreCombineKey:       date    uuid
   PartitionPath:       date    year,month,day,hour
        at 
org.apache.hudi.HoodieWriterUtils$.validateTableConfig(HoodieWriterUtils.scala:171)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:131)
        at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:150)
        at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
        at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
        at 
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
        at 
org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
        at 
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
        at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
        at 
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
        at 
java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
        at java.base/java.lang.reflect.Method.invoke(Method.java:578)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at 
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.base/java.lang.Thread.run(Thread.java:1589)
   
   ​
   ```
   
   
   # Then i am aiming to delete older partition 
   ```
   
   def delete_partition(partition_to_be_deleted, hudi_path):
       try:
           spark.createDataFrame([], StructType([])) \
               .write \
               .format("org.apache.hudi") \
               .options(**{}) \
               .option("hoodie.datasource.hive_sync.enable", False) \
               .option("hoodie.datasource.write.operation", "delete_partition") 
\
               .option("hoodie.datasource.write.partitions.to.delete", 
partition_to_be_deleted) \
               .mode("append") \
               .save(hudi_path)
           return True
       except Exception as e:
           return False
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to