tomyanth commented on issue #7653:
URL: https://github.com/apache/hudi/issues/7653#issuecomment-1538086919

   
   """
   
   Install
   https://dlcdn.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop2.tgz
   
   hadoop2.7
   
https://github.com/soumilshah1995/winutils/blob/master/hadoop-2.7.7/bin/winutils.exe
   
   pyspark --packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.1 --conf 
'spark.serializer=org.apache.spark.serializer.KryoSerializer'
   
   VAR
   SPARK_HOME
   HADOOP_HOME
   
   
   PATH
   `%HAPOOP_HOME%\bin`
   `%SPARK_HOME%\bin`
   
   Complete Tutorials on HUDI
   
https://github.com/soumilshah1995/Insert-Update-Read-Write-SnapShot-Time-Travel-incremental-Query-on-APache-Hudi-transacti/blob/main/hudi%20(1).ipynb
   """
   
   
   
   import os
   import sys
   import uuid
   
   import pyspark
   from pyspark.sql import SparkSession
   from pyspark import SparkConf, SparkContext
   from pyspark.sql.functions import col, asc, desc
   from pyspark.sql.functions import col, to_timestamp, 
monotonically_increasing_id, to_date, when
   from pyspark.sql.functions import *
   from pyspark.sql.types import *
   from datetime import datetime
   from functools import reduce
   from faker import Faker
   from faker import Faker
   
   import findspark
   
   import datetime 
   
   time = datetime.datetime.now()
   time = time.strftime("YMD%Y%m%dHHMMSSms%H%M%S%f")
   
   
   SUBMIT_ARGS = "--packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.1 
pyspark-shell"
   os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
   os.environ['PYSPARK_PYTHON'] = sys.executable
   os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
   
   findspark.init()
   
   spark = SparkSession.builder\
       .config('spark.serializer', 
'org.apache.spark.serializer.KryoSerializer') \
       .config('className', 'org.apache.hudi') \
       .config('spark.sql.hive.convertMetastoreParquet', 'false') \
       .config('spark.sql.extensions', 
'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
       .config('spark.sql.warehouse.dir', 'file:///C:/tmp/spark_warehouse') \
       .getOrCreate()
   
   
   global faker
   
   faker = Faker()
   
   
   class DataGenerator(object):
   
       @staticmethod
       def get_data():
           return [
               (
                   x,
                   faker.name(),
                   faker.random_element(elements=('IT', 'HR', 'Sales', 
'Marketing')),
                   faker.random_element(elements=('CA', 'NY', 'TX', 'FL', 'IL', 
'RJ')),
                   faker.random_int(min=10000, max=150000),
                   faker.random_int(min=18, max=60),
                   faker.random_int(min=0, max=100000),
                   faker.unix_time()
               ) for x in range(5)
           ]
   
   
   data = DataGenerator.get_data()
   
   columns = ["emp_id", "employee_name", "department", "state", "salary", 
"age", "bonus", "ts"]
   spark_df = spark.createDataFrame(data=data, schema=columns)
   print(spark_df.show())
   
   db_name = "hudidb"
   table_name = "hudi_table"
   recordkey = 'emp_id'
   precombine = 'ts'
   
   path = "file:///C:/tmp/spark_warehouse"
   method = 'upsert'
   table_type = "COPY_ON_WRITE"
   
   hudi_options = {
       'hoodie.table.name': table_name,
       'hoodie.datasource.write.recordkey.field': 'emp_id',
       'hoodie.datasource.write.table.name': table_name,
       'hoodie.datasource.write.operation': 'upsert',
       'hoodie.datasource.write.precombine.field': 'ts',
       'hoodie.upsert.shuffle.parallelism': 2,
       'hoodie.insert.shuffle.parallelism': 2,
       'hoodie.schema.on.read.enable' : 'true', # for changing column names
   
       'hoodie.write.concurrency.mode':'optimistic_concurrency_control',     
#added for zookeeper to deal with multiple source writes
       'hoodie.cleaner.policy.failed.writes':'LAZY',
       # 
'hoodie.write.lock.provider':'org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider',
   
       
'hoodie.write.lock.provider':'org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider',
       'hoodie.write.lock.zookeeper.url':'localhost',
       'hoodie.write.lock.zookeeper.port':'2181',
       'hoodie.write.lock.zookeeper.lock_key':'my_lock',
       'hoodie.write.lock.zookeeper.base_path':'/hudi_locks',
   
   }
   
   print("*"*55)
   print("over-write")
   print("*"*55)
   
   spark_df.write.format("hudi"). \
       options(**hudi_options). \
       mode("overwrite"). \
       save(path)
   
   print("*"*55)
   print("READ")
   print("*"*55)
   read_df = spark.read. \
       format("hudi"). \
       load(path)
   print(read_df.show())
   
   impleDataUpd = [
       (6, "This is APPEND4", "Sales", "RJ", 81000, 30, 23000, 827307999),
       (7, "This is APPEND4", "Engineering", "RJ", 79000, 53, 15000, 
1627694678),
   ]
   
   columns = ["emp_id", "employee_name", "department", "state", "salary", 
"age", "bonus", "ts"]
   usr_up_df = spark.createDataFrame(data=impleDataUpd, schema=columns)
   
   
   
usr_up_df.write.format("hudi").options(**hudi_options).mode("append").save(path)
   
   print("*"*55)
   print("READ")
   print("*"*55)
   read_df = spark.read. \
       format("hudi"). \
       load(path)
   print(read_df.show())


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to