soumilshah1995 opened a new issue, #9210:
URL: https://github.com/apache/hudi/issues/9210
I am attempting to build a feature where i was trying to compact
YYYY/MM/DD/HH into YYYY/MM/DD
i almost have the logic and i am stuck where hudi is not allowing me to
overwrite partition path
here is sample code
# Step 1 Create Hudi table with YYYY/MM/DD/HH
```
import os
import sys
import uuid
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.sql.functions import col, asc, desc
from pyspark.sql.functions import col, to_timestamp,
monotonically_increasing_id, to_date, when
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime
from functools import reduce
from faker import Faker
SUBMIT_ARGS = "--packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.1
pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
spark = SparkSession.builder \
.config('spark.serializer',
'org.apache.spark.serializer.KryoSerializer') \
.config('className', 'org.apache.hudi') \
.config('spark.sql.hive.convertMetastoreParquet', 'false') \
.getOrCreate()
db_name = "hudidb"
table_name = "hudi_table"
recordkey = 'uuid'
precombine = 'uuid'
path = f"file:///C:/tmp/{db_name}/{table_name}"
method = 'upsert'
table_type = "COPY_ON_WRITE" # COPY_ON_WRITE | MERGE_ON_READ
hudi_options = {
'hoodie.table.name': table_name,
'hoodie.datasource.write.recordkey.field': recordkey,
'hoodie.datasource.write.table.name': table_name,
'hoodie.datasource.write.operation': method,
'hoodie.datasource.write.precombine.field': precombine,
'hoodie.datasource.write.partitionpath.field': 'year,month,day,hour',
}
data_items = [
(1, "mess 1", 111, "2021", "01", "01", "3"),
(2, "mes 2", 22, "2021", "01", "01", "4"),
]
columns = ["uuid", "message", "precomb", "year", "month", "day", "hour"]
spark_df = spark.createDataFrame(data=data_items, schema=columns)
spark_df.show()
spark_df.printSchema()
spark_df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(path)
```
# Step 2: Logic
* find partition
* use insert_overwrite
* delete parition
```
db_name = "hudidb"
table_name = "hudi_table"
recordkey = 'uuid'
precombine = 'uuid'
partition_compaction = "2021/01/01"
hudi_path = f"file:///C:/tmp/{db_name}/{table_name}/{partition_compaction}"
hour_partitions = spark.read.format("hudi") \
.load(hudi_path)
cleansed_columns = [col for col in hour_partitions.columns if "_hoodie" not
in col]
hour_partitions_cleansed = hour_partitions.select(cleansed_columns)
hour_partitions_cleansed.show()
```
```
def write_hudi_table(db_name,
table_name,
recordkey,
precombine,
partition_column,
spark_df,
method='upsert',
):
try:
hudi_options = {
'hoodie.table.name': table_name,
'hoodie.datasource.write.recordkey.field': recordkey,
'hoodie.datasource.write.table.name': table_name,
'hoodie.datasource.write.operation': method,
'hoodie.datasource.write.precombine.field': precombine,
'hoodie.datasource.write.partitionpath.field': partition_column,
}
path = f"file:///C:/tmp/{db_name}/{table_name}"
spark_df.write.format("hudi") \
.options(**hudi_options) \
.mode("append") \
.save(path)
except Exception as e:
print("e", e)
```
## When calling Insert_overwrite
```
write_hudi_table(
db_name="hudidb",
table_name="hudi_table",
recordkey="uuid",
precombine="date",
partition_column="date",
spark_df=hour_partitions_cleansed,
method='insert_overwrite',
)
```
# Error
```
e An error occurred while calling o73.save.
: org.apache.hudi.exception.HoodieException: Config conflict(key current
value existing value):
PreCombineKey: date uuid
PartitionPath: date year,month,day,hour
at
org.apache.hudi.HoodieWriterUtils$.validateTableConfig(HoodieWriterUtils.scala:171)
at
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:131)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:150)
at
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
at
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
at
org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
at
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
at
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
at
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
at
java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
at java.base/java.lang.reflect.Method.invoke(Method.java:578)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:1589)
```
# Then i am aiming to delete older partition
```
def delete_partition(partition_to_be_deleted, hudi_path):
try:
spark.createDataFrame([], StructType([])) \
.write \
.format("org.apache.hudi") \
.options(**{}) \
.option("hoodie.datasource.hive_sync.enable", False) \
.option("hoodie.datasource.write.operation", "delete_partition")
\
.option("hoodie.datasource.write.partitions.to.delete",
partition_to_be_deleted) \
.mode("append") \
.save(hudi_path)
return True
except Exception as e:
return False
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]