mahesh2247 opened a new issue, #7688:
URL: https://github.com/apache/hudi/issues/7688
```
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.sql.session import SparkSession
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import DataFrame, Row
from pyspark.sql.functions import *
from pyspark.sql.functions import col, to_timestamp,
monotonically_increasing_id, to_date, when
import datetime
from awsglue import DynamicFrame
import boto3
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ["JOB_NAME", "database_name",
"kinesis_table_name", "starting_position_of_kinesis_iterator",
"hudi_table_name", "window_size", "s3_path_hudi", "s3_path_spark" ])
spark =
SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer').config('spark.sql.hive.convertMetastoreParquet','false').getOrCreate()
sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
logger = glueContext.get_logger()
database_name = args["database_name"]
kinesis_table_name = args["kinesis_table_name"]
hudi_table_name = args["hudi_table_name"]
s3_path_hudi = args["s3_path_hudi"]
s3_path_spark = args["s3_path_spark"]
commonConfig = {'hoodie.datasource.write.hive_style_partitioning' :
'true','className' : 'org.apache.hudi',
'hoodie.datasource.hive_sync.use_jdbc':'false',
'hoodie.datasource.write.precombine.field': 'ApproximateCreationDateTime',
'hoodie.datasource.write.recordkey.field': 'id', 'hoodie.table.name':
hudi_table_name, 'hoodie.consistency.check.enabled': 'true',
'hoodie.datasource.hive_sync.database': database_name,
'hoodie.datasource.hive_sync.table': hudi_table_name,
'hoodie.datasource.hive_sync.enable': 'true', 'path': s3_path_hudi}
partitionDataConfig = { 'hoodie.datasource.write.keygenerator.class' :
'org.apache.hudi.keygen.ComplexKeyGenerator',
'hoodie.datasource.write.partitionpath.field': "partitionkey, partitionkey2 ",
'hoodie.datasource.hive_sync.partition_extractor_class':
'org.apache.hudi.hive.MultiPartKeysValueExtractor',
'hoodie.datasource.hive_sync.partition_fields': "partitionkey, partitionkey2"}
incrementalConfig = {'hoodie.upsert.shuffle.parallelism': 68,
'hoodie.datasource.write.operation': 'upsert', 'hoodie.cleaner.policy':
'KEEP_LATEST_COMMITS', 'hoodie.cleaner.commits.retained': 2}
combinedConf = {**commonConfig, **partitionDataConfig, **incrementalConfig}
glue_temp_storage = s3_path_hudi
data_frame_DataSource0 = glueContext.create_data_frame.from_catalog(database
= database_name, table_name = kinesis_table_name, transformation_ctx =
"DataSource0", additional_options = {"startingPosition": "TRIM_HORIZON",
"inferSchema": "true"})
def processBatch(data_frame, batchId):
if (data_frame.count() > 0):
DataSource0 = DynamicFrame.fromDF(data_frame, glueContext,
"from_data_frame")
your_map = [
('eventName', 'string', 'eventName', 'string'),
('userIdentity', 'string', 'userIdentity', 'string'),
('eventSource', 'string', 'eventSource', 'string'),
('tableName', 'string', 'tableName', 'string'),
('recordFormat', 'string', 'recordFormat', 'string'),
('eventID', 'string', 'eventID', 'string'),
('dynamodb.ApproximateCreationDateTime', 'long',
'ApproximateCreationDateTime', 'long'),
('dynamodb.SizeBytes', 'long', 'SizeBytes', 'long'),
('dynamodb.NewImage.id.S', 'string', 'id', 'string'),
('dynamodb.NewImage.custName.S', 'string', 'custName', 'string'),
('dynamodb.NewImage.email.S', 'string', 'email', 'string'),
('dynamodb.NewImage.registrationDate.S', 'string',
'registrationDate', 'string'),
('awsRegion', 'string', 'awsRegion', 'string')
]
new_df = ApplyMapping.apply(frame = DataSource0, mappings=your_map,
transformation_ctx = "applymapping1")
abc = new_df.toDF()
logger.info("This is abc = {}".format(abc))
if str(abc["eventName"]) == "REMOVE":
inputDf =
abc.withColumn('Result',when(abc.id!=abc["id"],"True")).filter("Result==True").drop("Result")
else:
inputDf =
abc.withColumn('update_ts_dms',to_timestamp(abc["registrationDate"])).withColumn('partitionkey',abc["id"].substr(-1,1)).withColumn('partitionkey2',abc["id"].substr(-2,1))
# glueContext.write_dynamic_frame.from_options(frame =
DynamicFrame.fromDF(inputDf, glueContext, "inputDf"), connection_type =
"marketplace.spark", connection_options = combinedConf)
glueContext.write_dynamic_frame.from_options(frame =
DynamicFrame.fromDF(inputDf, glueContext, "inputDf"), connection_type =
"custom.spark", connection_options = combinedConf)
glueContext.forEachBatch(frame = data_frame_DataSource0, batch_function =
processBatch, options = {"windowSize": "10 seconds", "checkpointLocation":
s3_path_spark})
job.commit()
```
Generating ```StreamingQueryException: An exception was raised by the Python
Proxy. Return Message: Traceback (most recent call last): Error``` and also
```HoodieUpsertException: Failed to upsert for commit time 20230117170914433```
for Delete. Kindly Help
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]