soumilshah1995 commented on issue #9170:
URL: https://github.com/apache/hudi/issues/9170#issuecomment-1631709692
can you try these code
```
try:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.sql.session import SparkSession
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import DataFrame, Row
from pyspark.sql.functions import *
from pyspark.sql.functions import col, to_timestamp,
monotonically_increasing_id, to_date, when
import datetime
from awsglue import DynamicFrame
import pyspark.sql.functions as F
import boto3
except Exception as e:
print("ERROR IMPORTS ", e)
"""
{
"version": "0",
"id": "6b437ef5-9686-f7c7-b7dd-a2438bf5c92c",
"detail-type": "order",
"source": "order",
"account": "043916019468",
"time": "2023-02-12T10:42:48Z",
"region": "us-west-1",
"resources": [],
"detail": {
"orderid": "a59a6e61-ebb3-4442-8888-53d9654ab3eb",
"customer_id": "71d03aa7-5179-4c62-ab7d-e9cbf981d889",
"ts": "2023-02-12T10:42:48.264802",
"order_value": "691",
"priority": "MEDIUM"
}
}
-----------
INPUT
-----------
root
|-- account: string (nullable = true)
|-- detail: struct (nullable = true)
| |-- customer_id: string (nullable = true)
| |-- order_value: string (nullable = true)
| |-- orderid: string (nullable = true)
| |-- priority: string (nullable = true)
| |-- ts: string (nullable = true)
|-- detail-type: string (nullable = true)
|-- id: string (nullable = true)
|-- region: string (nullable = true)
|-- resources: array (nullable = true)
| |-- element: string (containsNull = true)
|-- source: string (nullable = true)
|-- time: string (nullable = true)
|-- version: string (nullable = true)
-----------
OUTPUT
-----------
root
|-- account: string (nullable = true)
|-- detail_type: string (nullable = true)
|-- event_id: string (nullable = true)
|-- region: string (nullable = true)
|-- source: string (nullable = true)
|-- time: string (nullable = true)
|-- version: string (nullable = true)
|-- detail_customer_id: string (nullable = true)
|-- detail_order_value: string (nullable = true)
|-- detail_orderid: string (nullable = true)
|-- detail_priority: string (nullable = true)
|-- detail_ts: string (nullable = true)
"""
def create_spark_session():
spark = SparkSession \
.builder \
.config('spark.serializer',
'org.apache.spark.serializer.KryoSerializer') \
.getOrCreate()
return spark
spark = create_spark_session()
sc = spark.sparkContext
glueContext = GlueContext(sc)
# ====================== Settings
============================================
db_name = "hudidb"
kinesis_table_name = 'kinesis_order_events'
table_name = "order"
record_key = 'detail_orderid'
precomb = 'detail_ts'
s3_bucket = 'sXXXXX'
s3_path_hudi = f's3a://{s3_bucket}/{table_name}/'
s3_path_spark = f's3://{s3_bucket}/spark_checkpoints/'
method = 'upsert'
table_type = "MERGE_ON_READ"
window_size = '10 seconds'
starting_position_of_kinesis_iterator = 'trim_horizon'
connection_options = {
'hoodie.table.name': table_name,
"hoodie.datasource.write.storage.type": table_type,
'hoodie.datasource.write.recordkey.field': record_key,
'hoodie.datasource.write.table.name': table_name,
'hoodie.datasource.write.operation': method,
'hoodie.datasource.write.precombine.field': precomb,
'hoodie.datasource.hive_sync.enable': 'true',
"hoodie.datasource.hive_sync.mode": "hms",
'hoodie.datasource.hive_sync.sync_as_datasource': 'false',
'hoodie.datasource.hive_sync.database': db_name,
'hoodie.datasource.hive_sync.table': table_name,
'hoodie.datasource.hive_sync.use_jdbc': 'false',
'hoodie.datasource.hive_sync.partition_extractor_class':
'org.apache.hudi.hive.MultiPartKeysValueExtractor',
'hoodie.datasource.write.hive_style_partitioning': 'true',
}
# ======================================================================
starting_position_of_kinesis_iterator = starting_position_of_kinesis_iterator
data_frame_DataSource0 = glueContext.create_data_frame.from_catalog(
database=db_name,
table_name=kinesis_table_name,
transformation_ctx="DataSource0",
additional_options={"inferSchema": "true", "startingPosition":
starting_position_of_kinesis_iterator}
)
def flatten_df(nested_df):
flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct']
nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct']
flat_df = nested_df.select(flat_cols +
[F.col(nc + '.' + c).alias(nc + '_' + c)
for nc in nested_cols
for c in nested_df.select(nc +
'.*').columns])
return flat_df
def process_batch(data_frame, batchId):
if (data_frame.count() > 0):
kinesis_dynamic_frame = DynamicFrame.fromDF(data_frame, glueContext,
"from_kinesis_data_frame")
kinesis_spark_df = kinesis_dynamic_frame.toDF()
spark_df = flatten_df(kinesis_spark_df)
spark_df = spark_df.withColumnRenamed('id', 'event_id')
spark_df = spark_df.withColumnRenamed('detail-type', 'detail_type')
spark_df = spark_df.drop("resources")
spark_df = spark_df.drop("time")
try:
spark_df.write.format("hudi").options(**connection_options).mode("append").save(s3_path_hudi)
except Exception as e:
pass
glueContext.forEachBatch(
frame=data_frame_DataSource0,
batch_function=process_batch,
options={
"windowSize": window_size,
"checkpointLocation": s3_path_spark
}
)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]