soumilshah1995 commented on issue #362:
URL: 
https://github.com/apache/incubator-xtable/issues/362#issuecomment-1986306167

   Tests 
   
   ```
   """"
   %%configure -f
   {
   "conf": {
   "spark.serializer": "org.apache.spark.serializer.KryoSerializer",
   "spark.sql.hive.convertMetastoreParquet": "false",
   "spark.sql.catalog.spark_catalog": 
"org.apache.spark.sql.hudi.catalog.HoodieCatalog",
   "spark.sql.legacy.pathOptionBehavior.enabled": "true",
   "spark.sql.extensions": 
"org.apache.spark.sql.hudi.HoodieSparkSessionExtension"
   }
   }
   
   
   %pip install Faker
   
   
   permanently delete
   
   """
   
   
   try:
       import sys, random, uuid
       from pyspark.context import SparkContext
       from pyspark.sql.session import SparkSession
       from awsglue.context import GlueContext
       from awsglue.job import Job
       from awsglue.dynamicframe import DynamicFrame
       from pyspark.sql.functions import col, to_timestamp, 
monotonically_increasing_id, to_date, when
       from pyspark.sql.functions import *
       from awsglue.utils import getResolvedOptions
       from pyspark.sql.types import *
       from datetime import datetime, date
       import boto3, pandas
       from functools import reduce
       from pyspark.sql import Row
       from faker import Faker
   except Exception as e:
       print("Modules are missing : {} ".format(e))
   
   job_start_ts = datetime.now()
   ts_format = '%Y-%m-%d %H:%M:%S'
   
   spark = (SparkSession.builder.config('spark.serializer', 
'org.apache.spark.serializer.KryoSerializer') \
            .config('spark.sql.hive.convertMetastoreParquet', 'false') \
            .config('spark.sql.catalog.spark_catalog', 
'org.apache.spark.sql.hudi.catalog.HoodieCatalog') \
            .config('spark.sql.extensions', 
'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
            .config('spark.sql.legacy.pathOptionBehavior.enabled', 
'true').getOrCreate())
   
   sc = spark.sparkContext
   glueContext = GlueContext(sc)
   job = Job(glueContext)
   logger = glueContext.get_logger()
   
   global faker
   faker = Faker()
   
   
   def get_customer_data(total_customers=2):
       customers_array = []
       for i in range(0, total_customers):
           customer_data = {
               "customer_id": str(uuid.uuid4()),
               "name": faker.name(),
               "state": faker.state(),
               "city": faker.city(),
               "email": faker.email(),
               "created_at": datetime.now().isoformat().__str__(),
               "address": faker.address(),
   
           }
           customers_array.append(customer_data)
       return customers_array
   
   
   def get_orders_data(customer_ids, order_data_sample_size=3):
       orders_array = []
       for i in range(0, order_data_sample_size):
           try:
               order_id = uuid.uuid4().__str__()
               customer_id = random.choice(customer_ids)
               order_data = {
                   "order_id": order_id,
                   "name": faker.text(max_nb_chars=20),
                   "order_value": random.randint(10, 1000).__str__(),
                   "priority": random.choice(["LOW", "MEDIUM", "HIGH"]),
                   "order_date": faker.date_between(start_date='-30d', 
end_date='today').strftime('%Y-%m-%d'),
                   "customer_id": customer_id,
   
               }
               orders_array.append(order_data)
           except Exception as e:
               print(e)
       return orders_array
   
   
   
   
   
   def upsert_hudi_table(glue_database, table_name, record_id, precomb_key, 
table_type, spark_df, partition_fields,
                         enable_partition, enable_cleaner, enable_hive_sync, 
enable_clustering,
                         enable_meta_data_indexing,
                         use_sql_transformer, sql_transformer_query,
                         target_path, index_type, method='upsert', 
clustering_column='default'):
       """
       Upserts a dataframe into a Hudi table.
   
       Args:
           glue_database (str): The name of the glue database.
           table_name (str): The name of the Hudi table.
           record_id (str): The name of the field in the dataframe that will be 
used as the record key.
           precomb_key (str): The name of the field in the dataframe that will 
be used for pre-combine.
           table_type (str): The Hudi table type (e.g., COPY_ON_WRITE, 
MERGE_ON_READ).
           spark_df (pyspark.sql.DataFrame): The dataframe to upsert.
           partition_fields this is used to parrtition data
           enable_partition (bool): Whether or not to enable partitioning.
           enable_cleaner (bool): Whether or not to enable data cleaning.
           enable_hive_sync (bool): Whether or not to enable syncing with Hive.
           use_sql_transformer (bool): Whether or not to use SQL to transform 
the dataframe before upserting.
           sql_transformer_query (str): The SQL query to use for data 
transformation.
           target_path (str): The path to the target Hudi table.
           method (str): The Hudi write method to use (default is 'upsert').
           index_type : BLOOM or GLOBAL_BLOOM
       Returns:
           None
       """
       # These are the basic settings for the Hoodie table
       hudi_final_settings = {
           "hoodie.table.name": table_name,
           "hoodie.datasource.write.table.type": table_type,
           "hoodie.datasource.write.operation": method,
           "hoodie.datasource.write.recordkey.field": record_id,
           "hoodie.datasource.write.precombine.field": precomb_key,
       }
   
       # These settings enable syncing with Hive
       hudi_hive_sync_settings = {
           "hoodie.parquet.compression.codec": "gzip",
           "hoodie.datasource.hive_sync.enable": "true",
           "hoodie.datasource.hive_sync.database": glue_database,
           "hoodie.datasource.hive_sync.table": table_name,
           "hoodie.datasource.hive_sync.partition_extractor_class": 
"org.apache.hudi.hive.MultiPartKeysValueExtractor",
           "hoodie.datasource.hive_sync.use_jdbc": "false",
           "hoodie.datasource.hive_sync.mode": "hms",
       }
   
       # These settings enable automatic cleaning of old data
       hudi_cleaner_options = {
           "hoodie.clean.automatic": "true",
           "hoodie.clean.async": "true",
           "hoodie.cleaner.policy": 'KEEP_LATEST_FILE_VERSIONS',
           "hoodie.cleaner.fileversions.retained": "3",
           "hoodie-conf hoodie.cleaner.parallelism": '200',
           'hoodie.cleaner.commits.retained': 5
       }
   
       # These settings enable partitioning of the data
       partition_settings = {
           "hoodie.datasource.write.partitionpath.field": partition_fields,
           "hoodie.datasource.hive_sync.partition_fields": partition_fields,
           "hoodie.datasource.write.hive_style_partitioning": "true",
       }
   
       hudi_clustering = {
           "hoodie.clustering.execution.strategy.class": 
"org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy",
           "hoodie.clustering.inline": "true",
           "hoodie.clustering.plan.strategy.sort.columns": clustering_column,
           "hoodie.clustering.plan.strategy.target.file.max.bytes": 
"1073741824",
           "hoodie.clustering.plan.strategy.small.file.limit": "629145600"
       }
   
       # Define a dictionary with the index settings for Hudi
       hudi_index_settings = {
           "hoodie.index.type": index_type,  # Specify the index type for Hudi
       }
   
       # Define a dictionary with the Fiel Size
       hudi_file_size = {
           "hoodie.parquet.max.file.size": 512 * 1024 * 1024,  # 512MB
           "hoodie.parquet.small.file.limit": 104857600,  # 100MB
       }
   
       hudi_meta_data_indexing = {
           "hoodie.metadata.enable": "true",
           "hoodie.metadata.index.async": "true",
           "hoodie.metadata.index.column.stats.enable": "true",
           "hoodie.metadata.index.check.timeout.seconds": "60",
           "hoodie.write.concurrency.mode": "optimistic_concurrency_control",
           "hoodie.write.lock.provider": 
"org.apache.hudi.client.transaction.lock.InProcessLockProvider"
       }
   
       if enable_meta_data_indexing == True or enable_meta_data_indexing == 
"True" or enable_meta_data_indexing == "true":
           for key, value in hudi_meta_data_indexing.items():
               hudi_final_settings[key] = value  # Add the key-value pair to 
the final settings dictionary
   
       if enable_clustering == True or enable_clustering == "True" or 
enable_clustering == "true":
           for key, value in hudi_clustering.items():
               hudi_final_settings[key] = value  # Add the key-value pair to 
the final settings dictionary
   
       # Add the Hudi index settings to the final settings dictionary
       for key, value in hudi_index_settings.items():
           hudi_final_settings[key] = value  # Add the key-value pair to the 
final settings dictionary
   
       for key, value in hudi_file_size.items():
           hudi_final_settings[key] = value  # Add the key-value pair to the 
final settings dictionary
   
       # If partitioning is enabled, add the partition settings to the final 
settings
       if enable_partition == "True" or enable_partition == "true" or 
enable_partition == True:
           for key, value in partition_settings.items(): 
hudi_final_settings[key] = value
   
       # If data cleaning is enabled, add the cleaner options to the final 
settings
       if enable_cleaner == "True" or enable_cleaner == "true" or 
enable_cleaner == True:
           for key, value in hudi_cleaner_options.items(): 
hudi_final_settings[key] = value
   
       # If Hive syncing is enabled, add the Hive sync settings to the final 
settings
       if enable_hive_sync == "True" or enable_hive_sync == "true" or 
enable_hive_sync == True:
           for key, value in hudi_hive_sync_settings.items(): 
hudi_final_settings[key] = value
   
       # If there is data to write, apply any SQL transformations and write to 
the target path
       if spark_df.count() > 0:
           if use_sql_transformer == "True" or use_sql_transformer == "true" or 
use_sql_transformer == True:
               spark_df.createOrReplaceTempView("temp")
               spark_df = spark.sql(sql_transformer_query)
   
           spark_df.write.format("hudi"). \
               options(**hudi_final_settings). \
               mode("append"). \
               save(target_path)
   
   
   # Define total number of customers and order data sample size
   total_customers = 10
   order_data_sample_size = 20
   
   # Generate customer data
   customer_data = get_customer_data(total_customers=total_customers)
   
   # Generate order data
   order_data = get_orders_data(
       order_data_sample_size=order_data_sample_size,
       customer_ids=[i.get("customer_id") for i in customer_data]
   )
   
   # Define schema for customer data
   customer_schema = StructType([
       StructField("customer_id", StringType(), nullable=False),
       StructField("name", StringType(), nullable=True),
       StructField("state", StringType(), nullable=False),
       StructField("city", StringType(), nullable=False),
       StructField("email", StringType(), nullable=False),
       StructField("created_at", StringType(), nullable=False),
       StructField("address", StringType(), nullable=False)
   ])
   
   # Create DataFrame for customer data using the defined schema
   spark_df_customers = spark.createDataFrame(data=[tuple(i.values()) for i in 
customer_data], schema=customer_schema)
   
   # Show the DataFrame
   spark_df_customers.show()
   
   # Define schema for order data
   order_schema = StructType([
       StructField("order_id", StringType(), nullable=False),
       StructField("customer_id", StringType(), nullable=False),
       StructField("product_id", StringType(), nullable=False),
       StructField("quantity", StringType(), nullable=False),  # Change to 
StringType
       StructField("total_price", StringType(), nullable=False),  # Change to 
StringType
       StructField("order_date", StringType(), nullable=False)  # Change to 
StringType
   ])
   
   
   # Create DataFrame for order data using the defined schema
   spark_df_orders = spark.createDataFrame(data=[tuple(i.values()) for i in 
order_data], schema=order_schema)
   
   # Show the DataFrame
   spark_df_orders.show()
   
   BUCKET = "soumil-dev-bucket-1995"
   
   upsert_hudi_table(
       glue_database="default",
       table_name="customers",
       record_id="customer_id",
       precomb_key="created_at",
       table_type='COPY_ON_WRITE',
       partition_fields="state",
       method='upsert',
       index_type='BLOOM',
       enable_partition=True,
       enable_cleaner=False,
       enable_hive_sync=True,
       enable_clustering='False',
       clustering_column='default',
       enable_meta_data_indexing='false',
       use_sql_transformer=False,
       sql_transformer_query='default',
       target_path=f"s3://{BUCKET}/silver/table_name=customers/",
       spark_df=spark_df_customers,
   )
   
   upsert_hudi_table(
       glue_database="default",
       table_name="orders",
       record_id="order_id",
       precomb_key="order_date",
       table_type='COPY_ON_WRITE',
       partition_fields="default",
       method='upsert',
       index_type='BLOOM',
       enable_partition=False,
       enable_cleaner=False,
       enable_hive_sync=True,
       enable_clustering='False',
       clustering_column='default',
       enable_meta_data_indexing='false',
       use_sql_transformer=False,
       sql_transformer_query='default',
       target_path=f"s3://{BUCKET}/silver/table_name=orders/",
       spark_df=spark_df_orders,
   )
   
   ```
   
   
   
   # configyml
   ```
   sourceFormat: HUDI
   
   targetFormats:
     - ICEBERG
   datasets:
     -
       tableBasePath: s3://soumil-dev-bucket-1995/silver/table_name=customers/
       tableName: customers
       partitionSpec: state:VALUE
   
   
   ```
   
   # Result 
   ```
   J: No SLF4J providers were found.
   SLF4J: Defaulting to no-operation (NOP) logger implementation
   SLF4J: See https://www.slf4j.org/codes.html#noProviders for further details.
   SLF4J: Class path contains SLF4J bindings targeting slf4j-api versions 1.7.x 
or earlier.
   SLF4J: Ignoring binding found at 
[jar:file:/home/glue_user/workspace/jupyter_workspace/utilities-0.1.0-beta1-bundled.jar!/org/slf4j/impl/StaticLoggerBinder.class]
   SLF4J: See https://www.slf4j.org/codes.html#ignoredBindings for an 
explanation.
   2024-03-08 19:40:15 INFO  io.onetable.utilities.RunSync:141 - Running sync 
for basePath s3://soumil-dev-bucket-1995/silver/table_name=customers/ for 
following table formats [ICEBERG]
   2024-03-08 19:40:17 INFO  io.onetable.client.OneTableClient:264 - No 
previous OneTable sync for target. Falling back to snapshot sync.
   # WARNING: Unable to get Instrumentation. Dynamic Attach failed. You may add 
this JAR as -javaagent manually, or supply -Djdk.attach.allowAttachSelf
   # WARNING: Unable to attach Serviceability Agent. Unable to attach even with 
module exceptions: [org.openjdk.jol.vm.sa.SASupportException: Sense failed., 
org.openjdk.jol.vm.sa.SASupportException: Sense failed., 
org.openjdk.jol.vm.sa.SASupportException: Sense failed.]
   2024-03-08 19:40:27 INFO  io.onetable.client.OneTableClient:127 - OneTable 
Sync is successful for the following formats [ICEBERG]
   sh-4.2$ 
   sh-4.2$ 
   
   
   
   ```
   
   # Solved 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to