soumilshah1995 opened a new issue, #8833:
URL: https://github.com/apache/hudi/issues/8833
Detailed Steps
# Step 1 Define Imports
```
try:
import os
import sys
import uuid
import random
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.sql.functions import col, asc, desc, to_timestamp,
monotonically_increasing_id, to_date, when, udf
from pyspark.sql.types import *
from functools import reduce
from faker import Faker
import pandas as pd
import boto3
import json
from datetime import datetime, date, timedelta
from pyspark.sql.functions import year, quarter, month, dayofmonth,
weekofyear
from pyspark.sql import functions as F
except Exception as e:
print(e)
```
# Step 2 Define Spark Session
```
SUBMIT_ARGS = "--packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.0
pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
spark = SparkSession.builder \
.config('spark.serializer',
'org.apache.spark.serializer.KryoSerializer') \
.config('className', 'org.apache.hudi') \
.config('spark.sql.hive.convertMetastoreParquet', 'false') \
.getOrCreate()
```
# Step 3 Define Class for Fake Data
```
global faker
faker = Faker()
import uuid
def get_customer_data(total_customers=2):
customers_array = []
for i in range(0, total_customers):
customer_data = {
"customer_id": str(uuid.uuid4()),
"name": faker.name(),
"state": faker.state(),
"city": faker.city(),
"email": faker.email(),
"created_at": datetime.now().isoformat().__str__(),
"address":faker.address(),
}
customers_array.append(customer_data)
return customers_array
def get_orders_data(customer_ids, order_data_sample_size=3):
orders_array = []
for i in range(0, order_data_sample_size):
try:
order_id = uuid.uuid4().__str__()
customer_id= random.choice(customer_ids)
order_data = {
"order_id": order_id,
"name": faker.text(max_nb_chars=20),
"order_value": random.randint(10, 1000).__str__(),
"priority": random.choice(["LOW", "MEDIUM", "HIGH"]),
"order_date": faker.date_between(start_date='-30d',
end_date='today').strftime('%Y-%m-%d'),
"customer_id": customer_id,
}
orders_array.append(order_data)
except Exception as e:
print(e)
return orders_array
```
# Step 4 UPSERT method for Hudi
```
def upsert_hudi_table(
db_name,
table_name,
record_id,
precomb_key,
spark_df,
table_type='COPY_ON_WRITE',
method='upsert',
):
path = f"file:///C:/tmp/{db_name}/{table_name}"
print("path", path, end="\n")
hudi_options = {
'hoodie.table.name': table_name,
'hoodie.datasource.write.table.type': table_type,
'hoodie.datasource.write.recordkey.field': record_id,
'hoodie.datasource.write.table.name': table_name,
'hoodie.datasource.write.operation': method,
'hoodie.datasource.write.precombine.field': precomb_key
,"hoodie.write.commit.callback.http.url":"http://localhost:5000/"
,"hoodie.write.commit.callback.on":"true"
,"hoodie.write.commit.callback.http.timeout.seconds":"180"
}
spark_df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(path)
```
# Create Sample DF
```
global total_customers, order_data_sample_size
total_customers = 50
order_data_sample_size = 100
customer_data = get_customer_data(total_customers=total_customers)
spark_df_customers = spark.createDataFrame(data=[tuple(i.values()) for i in
customer_data],
schema=list(customer_data[0].keys()))
spark_df_customers.select(['customer_id','name','state','city','email','created_at']).show(3)
```
#### O/p
```
+--------------------+------------+-----------+------------+--------------------+--------------------+
| customer_id| name| state| city|
email| created_at|
+--------------------+------------+-----------+------------+--------------------+--------------------+
|d62fb16d-fc9e-46d...| Gary Travis| Kentucky| Harperburgh|
[email protected]|2023-05-28T08:33:...|
|2ec5ae28-0f11-47b...| Kathy Davis|
Hawaii|Gabrielville|jcantrell@example...|2023-05-28T08:33:...|
|7a368b35-a5b0-440...|Randy Jarvis|Mississippi|
Kellerville|chungandrew@examp...|2023-05-28T08:33:...|
+--------------------+------------+-----------+------------+--------------------+--------------------+
```
# upsert into Hudi
```
upsert_hudi_table(
db_name='hudidb',
table_name='customers',
record_id='customer_id',
precomb_key='created_at',
spark_df=spark_df_customers,
table_type='COPY_ON_WRITE',
method='upsert',
)
```
# Error
```
Py4JJavaError: An error occurred while calling o72.save.
: java.lang.UnsatisfiedLinkError: 'boolean
org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(java.lang.String, int)'
at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
at
org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1218)
at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1423)
at
org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
at
org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
at
org.apache.hudi.common.fs.HoodieWrapperFileSystem.lambda$listStatus$19(HoodieWrapperFileSystem.java:597)
at
org.apache.hudi.common.fs.HoodieWrapperFileSystem.executeFuncWithTimeMetrics(HoodieWrapperFileSystem.java:114)
at
org.apache.hudi.common.fs.HoodieWrapperFileSystem.listStatus(HoodieWrapperFileSystem.java:596)
at
org.apache.hudi.common.table.HoodieTableMetaClient.scanFiles(HoodieTableMetaClient.java:549)
at
org.apache.hudi.common.table.HoodieTableMetaClient.scanHoodieInstantsFromFileSystem(HoodieTableMetaClient.java:642)
at
org.apache.hudi.common.table.HoodieTableMetaClient.scanHoodieInstantsFromFileSystem(HoodieTableMetaClient.java:625)
at
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.<init>(HoodieActiveTimeline.java:163)
at
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.<init>(HoodieActiveTimeline.java:155)
at
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.<init>(HoodieActiveTimeline.java:175)
at
org.apache.hudi.common.table.HoodieTableMetaClient.getActiveTimeline(HoodieTableMetaClient.java:352)
at
org.apache.hudi.common.table.HoodieTableMetaClient.getCommitsTimeline(HoodieTableMetaClient.java:565)
at
org.apache.hudi.common.table.HoodieTableMetaClient.isTimelineNonEmpty(HoodieTableMetaClient.java:556)
at
org.apache.hudi.common.table.TableSchemaResolver.getTableAvroSchemaFromLatestCommit(TableSchemaResolver.java:309)
at
org.apache.hudi.HoodieSparkSqlWriter$.getLatestTableSchema(HoodieSparkSqlWriter.scala:637)
at
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:223)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:150)
at
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
at
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
at
org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
at
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
at
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
at
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
at
java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
at java.base/java.lang.reflect.Method.invoke(Method.java:578)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:1589)
```
# Sample Flask App
```
import json
import json
from flask import Flask
from flask import request
app = Flask(__name__)
@app.route('/', methods=["GET", "POST"])
def index():
json_data = request.json
print(json.dumps(json_data, indent=3))
return 'Web App with Python Flask!'
app.run(debug=True)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]