soumilshah1995 commented on issue #9170:
URL: https://github.com/apache/hudi/issues/9170#issuecomment-1631709692

   can you try these code
   
   ```
   
   try:
       import sys
       from awsglue.transforms import *
       from awsglue.utils import getResolvedOptions
   
       from pyspark.sql.session import SparkSession
       from pyspark.context import SparkContext
   
       from awsglue.context import GlueContext
       from awsglue.job import Job
   
       from pyspark.sql import DataFrame, Row
       from pyspark.sql.functions import *
       from pyspark.sql.functions import col, to_timestamp, 
monotonically_increasing_id, to_date, when
       import datetime
       from awsglue import DynamicFrame
       import pyspark.sql.functions as F
       import boto3
   except Exception as e:
       print("ERROR IMPORTS ", e)
   
   """
   
   {
       "version": "0",
       "id": "6b437ef5-9686-f7c7-b7dd-a2438bf5c92c",
       "detail-type": "order",
       "source": "order",
       "account": "043916019468",
       "time": "2023-02-12T10:42:48Z",
       "region": "us-west-1",
       "resources": [],
       "detail": {
           "orderid": "a59a6e61-ebb3-4442-8888-53d9654ab3eb",
           "customer_id": "71d03aa7-5179-4c62-ab7d-e9cbf981d889",
           "ts": "2023-02-12T10:42:48.264802",
           "order_value": "691",
           "priority": "MEDIUM"
       }
   }
   -----------
   INPUT
   -----------
   root
    |-- account: string (nullable = true)
    |-- detail: struct (nullable = true)
    |    |-- customer_id: string (nullable = true)
    |    |-- order_value: string (nullable = true)
    |    |-- orderid: string (nullable = true)
    |    |-- priority: string (nullable = true)
    |    |-- ts: string (nullable = true)
    |-- detail-type: string (nullable = true)
    |-- id: string (nullable = true)
    |-- region: string (nullable = true)
    |-- resources: array (nullable = true)
    |    |-- element: string (containsNull = true)
    |-- source: string (nullable = true)
    |-- time: string (nullable = true)
    |-- version: string (nullable = true)
   
   -----------
   OUTPUT
   -----------
   root
    |-- account: string (nullable = true)
    |-- detail_type: string (nullable = true)
    |-- event_id: string (nullable = true)
    |-- region: string (nullable = true)
    |-- source: string (nullable = true)
    |-- time: string (nullable = true)
    |-- version: string (nullable = true)
    |-- detail_customer_id: string (nullable = true)
    |-- detail_order_value: string (nullable = true)
    |-- detail_orderid: string (nullable = true)
    |-- detail_priority: string (nullable = true)
    |-- detail_ts: string (nullable = true)
   
   """
   
   
   def create_spark_session():
       spark = SparkSession \
           .builder \
           .config('spark.serializer', 
'org.apache.spark.serializer.KryoSerializer') \
           .getOrCreate()
       return spark
   
   
   spark = create_spark_session()
   sc = spark.sparkContext
   glueContext = GlueContext(sc)
   
   # ====================== Settings 
============================================
   
   
   db_name = "hudidb"
   kinesis_table_name = 'kinesis_order_events'
   table_name = "order"
   
   record_key = 'detail_orderid'
   precomb = 'detail_ts'
   s3_bucket = 'sXXXXX'
   s3_path_hudi = f's3a://{s3_bucket}/{table_name}/'
   s3_path_spark = f's3://{s3_bucket}/spark_checkpoints/'
   
   method = 'upsert'
   table_type = "MERGE_ON_READ"
   
   window_size = '10 seconds'
   starting_position_of_kinesis_iterator = 'trim_horizon'
   
   connection_options = {
       'hoodie.table.name': table_name,
       "hoodie.datasource.write.storage.type": table_type,
       'hoodie.datasource.write.recordkey.field': record_key,
       'hoodie.datasource.write.table.name': table_name,
       'hoodie.datasource.write.operation': method,
       'hoodie.datasource.write.precombine.field': precomb,
   
       'hoodie.datasource.hive_sync.enable': 'true',
       "hoodie.datasource.hive_sync.mode": "hms",
       'hoodie.datasource.hive_sync.sync_as_datasource': 'false',
       'hoodie.datasource.hive_sync.database': db_name,
       'hoodie.datasource.hive_sync.table': table_name,
       'hoodie.datasource.hive_sync.use_jdbc': 'false',
       'hoodie.datasource.hive_sync.partition_extractor_class': 
'org.apache.hudi.hive.MultiPartKeysValueExtractor',
       'hoodie.datasource.write.hive_style_partitioning': 'true',
   
   }
   
   # ======================================================================
   
   
   starting_position_of_kinesis_iterator = starting_position_of_kinesis_iterator
   
   data_frame_DataSource0 = glueContext.create_data_frame.from_catalog(
       database=db_name,
       table_name=kinesis_table_name,
       transformation_ctx="DataSource0",
       additional_options={"inferSchema": "true", "startingPosition": 
starting_position_of_kinesis_iterator}
   )
   
   
   def flatten_df(nested_df):
       flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct']
       nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct']
   
       flat_df = nested_df.select(flat_cols +
                                  [F.col(nc + '.' + c).alias(nc + '_' + c)
                                   for nc in nested_cols
                                   for c in nested_df.select(nc + 
'.*').columns])
       return flat_df
   
   
   def process_batch(data_frame, batchId):
       if (data_frame.count() > 0):
           kinesis_dynamic_frame = DynamicFrame.fromDF(data_frame, glueContext, 
"from_kinesis_data_frame")
           kinesis_spark_df = kinesis_dynamic_frame.toDF()
   
           spark_df = flatten_df(kinesis_spark_df)
           spark_df = spark_df.withColumnRenamed('id', 'event_id')
           spark_df = spark_df.withColumnRenamed('detail-type', 'detail_type')
           spark_df = spark_df.drop("resources")
           spark_df = spark_df.drop("time")
   
           try:
               
spark_df.write.format("hudi").options(**connection_options).mode("append").save(s3_path_hudi)
           except Exception as e:
               pass
   
   
   glueContext.forEachBatch(
       frame=data_frame_DataSource0,
       batch_function=process_batch,
       options={
           "windowSize": window_size,
           "checkpointLocation": s3_path_spark
       }
   )
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to