soumilshah1995 commented on issue #8040:
URL: https://github.com/apache/hudi/issues/8040#issuecomment-1443831871
sample code
```
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.sql.session import SparkSession
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import DataFrame, Row
from pyspark.sql.functions import *
from pyspark.sql.functions import col, to_timestamp,
monotonically_increasing_id, to_date, when
import datetime
from awsglue import DynamicFrame
import boto3
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv,
["JOB_NAME", "database_name",
"kinesis_table_name", "starting_position_of_kinesis_iterator",
"hudi_table_name", "window_size", "s3_path_hudi",
"s3_path_spark"])
spark = SparkSession.builder.config('spark.serializer',
'org.apache.spark.serializer.KryoSerializer').config(
'spark.sql.hive.convertMetastoreParquet', 'false').getOrCreate()
sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
database_name = args["database_name"]
kinesis_table_name = args["kinesis_table_name"]
hudi_table_name = args["hudi_table_name"]
s3_path_hudi = args["s3_path_hudi"]
s3_path_spark = args["s3_path_spark"]
print("***********")
print(f"""
database_name {database_name}
kinesis_table_name = {kinesis_table_name}
hudi_table_name ={hudi_table_name}
s3_path_hudi = {s3_path_hudi}
s3_path_spark = {s3_path_spark}
""")
# can be set to "latest", "trim_horizon" or "earliest"
starting_position_of_kinesis_iterator =
args["starting_position_of_kinesis_iterator"]
# The amount of time to spend processing each batch
window_size = args["window_size"]
data_frame_DataSource0 = glueContext.create_data_frame.from_catalog(
database=database_name,
table_name=kinesis_table_name,
transformation_ctx="DataSource0",
additional_options={"inferSchema": "true", "startingPosition":
starting_position_of_kinesis_iterator}
)
# config
commonConfig = {
'path': s3_path_hudi
}
hudiWriteConfig = {
'className': 'org.apache.hudi',
'hoodie.table.name': hudi_table_name,
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.write.table.type': 'COPY_ON_WRITE',
'hoodie.datasource.write.precombine.field': 'date',
'hoodie.datasource.write.recordkey.field': '_id',
'hoodie.datasource.write.partitionpath.field':
'year:SIMPLE,month:SIMPLE,day:SIMPLE',
'hoodie.datasource.write.keygenerator.class':
'org.apache.hudi.keygen.CustomKeyGenerator',
'hoodie.deltastreamer.keygen.timebased.timestamp.type': 'MIXED',
'hoodie.deltastreamer.keygen.timebased.input.dateformat': 'yyyy-mm-dd',
'hoodie.deltastreamer.keygen.timebased.output.dateformat': 'yyyy/MM/dd'
}
hudiGlueConfig = {
'hoodie.datasource.hive_sync.enable': 'true',
'hoodie.datasource.hive_sync.sync_as_datasource': 'false',
'hoodie.datasource.hive_sync.database': database_name,
'hoodie.datasource.hive_sync.table': hudi_table_name,
'hoodie.datasource.hive_sync.use_jdbc': 'false',
'hoodie.datasource.write.hive_style_partitioning': 'true',
'hoodie.datasource.hive_sync.partition_extractor_class':
'org.apache.hudi.hive.MultiPartKeysValueExtractor',
'hoodie.datasource.hive_sync.partition_fields': 'year,month,day'
}
combinedConf = {
**commonConfig,
**hudiWriteConfig,
**hudiGlueConfig
}
# ensure the incomong record has the correct current schema, new fresh
columns are fine, if a column exists in current schema but not in incoming
record then manually add before inserting
def evolveSchema(kinesis_df, table, forcecast=False):
try:
# get existing table's schema
glue_catalog_df = spark.sql("SELECT * FROM " + table + " LIMIT 0")
# sanitize for hudi specific system columns
columns_to_drop = ['_hoodie_commit_time', '_hoodie_commit_seqno',
'_hoodie_record_key',
'_hoodie_partition_path', '_hoodie_file_name']
glue_catalog_df_sanitized = glue_catalog_df.drop(*columns_to_drop)
if (kinesis_df.schema != glue_catalog_df_sanitized.schema):
merged_df = kinesis_df.unionByName(glue_catalog_df_sanitized,
allowMissingColumns=True)
return (merged_df)
except Exception as e:
print(e)
return (kinesis_df)
def processBatch(data_frame, batchId):
if (data_frame.count() > 0):
kinesis_dynamic_frame = DynamicFrame.fromDF(data_frame, glueContext,
"from_kinesis_data_frame")
kinesis_data_frame = kinesis_dynamic_frame.toDF()
kinesis_data_frame = evolveSchema(kinesis_data_frame, database_name
+ '.' + hudi_table_name, False)
glueContext.write_dynamic_frame.from_options(
frame=DynamicFrame.fromDF(kinesis_data_frame, glueContext,
"evolved_kinesis_data_frame"),
connection_type="custom.spark",
connection_options=combinedConf
)
glueContext.forEachBatch(
frame=data_frame_DataSource0,
batch_function=processBatch,
options={
"windowSize": window_size,
"checkpointLocation": s3_path_spark
}
)
job.commit()
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]