soumilshah1995 opened a new issue, #8833:
URL: https://github.com/apache/hudi/issues/8833

   
   Detailed Steps 
   
   # Step 1 Define Imports 
   ```
   try:
       import os
       import sys
       import uuid
       import random
       
       import pyspark
       from pyspark.sql import SparkSession
       from pyspark import SparkConf, SparkContext
       from pyspark.sql.functions import col, asc, desc, to_timestamp, 
monotonically_increasing_id, to_date, when, udf
       from pyspark.sql.types import *
       from functools import reduce
       from faker import Faker
       import pandas as pd
       
       import boto3
       import json
       from datetime import datetime, date, timedelta
   
       from pyspark.sql.functions import year, quarter, month, dayofmonth, 
weekofyear
       from pyspark.sql import functions as F
   except Exception as e:
       print(e)
   ```
   
   # Step 2 Define Spark Session 
   
   ```
   SUBMIT_ARGS = "--packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.0 
pyspark-shell"
   os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
   os.environ['PYSPARK_PYTHON'] = sys.executable
   os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
       
   spark = SparkSession.builder \
       .config('spark.serializer', 
'org.apache.spark.serializer.KryoSerializer') \
       .config('className', 'org.apache.hudi') \
       .config('spark.sql.hive.convertMetastoreParquet', 'false') \
       .getOrCreate()
   ```
   
   # Step 3 Define Class for Fake Data 
   ```
   global faker
   faker = Faker()
   
   import uuid
   
   def get_customer_data(total_customers=2):
       customers_array = []
       for i in range(0, total_customers):
           customer_data = {
               "customer_id": str(uuid.uuid4()),
               "name": faker.name(),
               "state": faker.state(),
               "city": faker.city(),
               "email": faker.email(),
               "created_at": datetime.now().isoformat().__str__(),
               "address":faker.address(),
         
           }
           customers_array.append(customer_data)
       return customers_array
   
   def get_orders_data(customer_ids, order_data_sample_size=3):
       orders_array = []
       for i in range(0, order_data_sample_size):
           try:
               order_id = uuid.uuid4().__str__()
               customer_id= random.choice(customer_ids)
               order_data = {
                   "order_id": order_id,
                   "name": faker.text(max_nb_chars=20),
                   "order_value": random.randint(10, 1000).__str__(),
                   "priority": random.choice(["LOW", "MEDIUM", "HIGH"]),
                   "order_date": faker.date_between(start_date='-30d', 
end_date='today').strftime('%Y-%m-%d'),
                   "customer_id": customer_id,
                   
               }
               orders_array.append(order_data)
           except Exception as e:
               print(e)
       return orders_array
   ```
   # Step 4 UPSERT method for Hudi 
   ```
   def upsert_hudi_table(
                         db_name,
                         table_name,
                         record_id,
                         precomb_key,
                         spark_df,
                         table_type='COPY_ON_WRITE',
                         method='upsert',
                         ):
   
       path = f"file:///C:/tmp/{db_name}/{table_name}"
       print("path", path, end="\n")
       
       hudi_options = {
           
           'hoodie.table.name': table_name,
           'hoodie.datasource.write.table.type': table_type,
           'hoodie.datasource.write.recordkey.field': record_id,
           'hoodie.datasource.write.table.name': table_name,
           'hoodie.datasource.write.operation': method,
           'hoodie.datasource.write.precombine.field': precomb_key
           
           ,"hoodie.write.commit.callback.http.url":"http://localhost:5000/";
           ,"hoodie.write.commit.callback.on":"true"
           ,"hoodie.write.commit.callback.http.timeout.seconds":"180"
       }
       spark_df.write.format("hudi"). \
           options(**hudi_options). \
           mode("append"). \
           save(path)
   ```
   
   # Create Sample DF
   ```
   global total_customers, order_data_sample_size
   
   total_customers = 50
   order_data_sample_size = 100
   
   customer_data  = get_customer_data(total_customers=total_customers)
   spark_df_customers = spark.createDataFrame(data=[tuple(i.values()) for i in 
customer_data],
                                              
schema=list(customer_data[0].keys()))
   
spark_df_customers.select(['customer_id','name','state','city','email','created_at']).show(3)
   ```
   #### O/p
   ```
   
+--------------------+------------+-----------+------------+--------------------+--------------------+
   |         customer_id|        name|      state|        city|               
email|          created_at|
   
+--------------------+------------+-----------+------------+--------------------+--------------------+
   |d62fb16d-fc9e-46d...| Gary Travis|   Kentucky| Harperburgh| 
[email protected]|2023-05-28T08:33:...|
   |2ec5ae28-0f11-47b...| Kathy Davis|     
Hawaii|Gabrielville|jcantrell@example...|2023-05-28T08:33:...|
   |7a368b35-a5b0-440...|Randy Jarvis|Mississippi| 
Kellerville|chungandrew@examp...|2023-05-28T08:33:...|
   
+--------------------+------------+-----------+------------+--------------------+--------------------+
   ```
   
   # upsert into Hudi
   
   ```
   upsert_hudi_table(
       db_name='hudidb',
       table_name='customers',
       record_id='customer_id',
       precomb_key='created_at',
       spark_df=spark_df_customers,
       table_type='COPY_ON_WRITE',
       method='upsert',
   )
   ```
   
   # Error 
   ```
   Py4JJavaError: An error occurred while calling o72.save.
   : java.lang.UnsatisfiedLinkError: 'boolean 
org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(java.lang.String, int)'
        at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
        at 
org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
        at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1218)
        at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1423)
        at 
org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
        at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
        at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
        at 
org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
        at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
        at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
        at 
org.apache.hudi.common.fs.HoodieWrapperFileSystem.lambda$listStatus$19(HoodieWrapperFileSystem.java:597)
        at 
org.apache.hudi.common.fs.HoodieWrapperFileSystem.executeFuncWithTimeMetrics(HoodieWrapperFileSystem.java:114)
        at 
org.apache.hudi.common.fs.HoodieWrapperFileSystem.listStatus(HoodieWrapperFileSystem.java:596)
        at 
org.apache.hudi.common.table.HoodieTableMetaClient.scanFiles(HoodieTableMetaClient.java:549)
        at 
org.apache.hudi.common.table.HoodieTableMetaClient.scanHoodieInstantsFromFileSystem(HoodieTableMetaClient.java:642)
        at 
org.apache.hudi.common.table.HoodieTableMetaClient.scanHoodieInstantsFromFileSystem(HoodieTableMetaClient.java:625)
        at 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.<init>(HoodieActiveTimeline.java:163)
        at 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.<init>(HoodieActiveTimeline.java:155)
        at 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.<init>(HoodieActiveTimeline.java:175)
        at 
org.apache.hudi.common.table.HoodieTableMetaClient.getActiveTimeline(HoodieTableMetaClient.java:352)
        at 
org.apache.hudi.common.table.HoodieTableMetaClient.getCommitsTimeline(HoodieTableMetaClient.java:565)
        at 
org.apache.hudi.common.table.HoodieTableMetaClient.isTimelineNonEmpty(HoodieTableMetaClient.java:556)
        at 
org.apache.hudi.common.table.TableSchemaResolver.getTableAvroSchemaFromLatestCommit(TableSchemaResolver.java:309)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.getLatestTableSchema(HoodieSparkSqlWriter.scala:637)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:223)
        at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:150)
        at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
        at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
        at 
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
        at 
org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
        at 
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
        at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
        at 
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
        at 
java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
        at java.base/java.lang.reflect.Method.invoke(Method.java:578)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at 
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.base/java.lang.Thread.run(Thread.java:1589)
   
   ```
   
   # Sample Flask App
   ```
   import json
   import json
   from flask import Flask
   from flask import request
   
   app = Flask(__name__)
   
   
   @app.route('/', methods=["GET", "POST"])
   def index():
       json_data  = request.json
       print(json.dumps(json_data, indent=3))
       return 'Web App with Python Flask!'
   
   
   app.run(debug=True)
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to