mahesh2247 commented on issue #7688:
URL: https://github.com/apache/hudi/issues/7688#issuecomment-1386588309

   Hey danny0405 and umehrot2. Thanks for your reply. I realised that I needed 
to add a logic to delete incoming data streams with a "REMOVE" label in it but 
do not know how to implement this delete records logic as part of glue jobs for 
apache hudi in my script.
   
   ```
   import sys
   from awsglue.transforms import *
   from awsglue.utils import getResolvedOptions
   from pyspark.sql.session import SparkSession
   from pyspark.context import SparkContext
   from awsglue.context import GlueContext
   from awsglue.job import Job
   from pyspark.sql import DataFrame, Row
   from pyspark.sql.functions import * 
   from pyspark.sql.functions import col, to_timestamp, 
monotonically_increasing_id, to_date, when
   import datetime
   from awsglue import DynamicFrame
   
   import boto3
   
   ## @params: [JOB_NAME]
   args = getResolvedOptions(sys.argv, ["JOB_NAME", "database_name", 
"kinesis_table_name", "starting_position_of_kinesis_iterator", 
"hudi_table_name", "window_size", "s3_path_hudi", "s3_path_spark" ])
   
   spark = 
SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer').config('spark.sql.hive.convertMetastoreParquet','false').getOrCreate()
                       
   sc = spark.sparkContext
   glueContext = GlueContext(sc)
   job = Job(glueContext)
   job.init(args['JOB_NAME'], args)
   
   database_name = args["database_name"]
   kinesis_table_name = args["kinesis_table_name"]
   hudi_table_name = args["hudi_table_name"]
   s3_path_hudi = args["s3_path_hudi"]
   s3_path_spark = args["s3_path_spark"]
   
   commonConfig = {'hoodie.datasource.write.hive_style_partitioning' : 
'true','className' : 'org.apache.hudi', 
'hoodie.datasource.hive_sync.use_jdbc':'false', 
'hoodie.datasource.write.precombine.field': 'id', 
'hoodie.datasource.write.recordkey.field': 'id', 'hoodie.table.name': 
hudi_table_name, 'hoodie.consistency.check.enabled': 'true', 
'hoodie.datasource.hive_sync.database': database_name, 
'hoodie.datasource.hive_sync.table': hudi_table_name, 
'hoodie.datasource.hive_sync.enable': 'true', 'path': s3_path_hudi}
   
   partitionDataConfig = { 'hoodie.datasource.write.keygenerator.class' : 
'org.apache.hudi.keygen.ComplexKeyGenerator', 
'hoodie.datasource.write.partitionpath.field': "partitionkey, partitionkey2 ", 
'hoodie.datasource.hive_sync.partition_extractor_class': 
'org.apache.hudi.hive.MultiPartKeysValueExtractor', 
'hoodie.datasource.hive_sync.partition_fields': "partitionkey, partitionkey2"}
   
   incrementalConfig = {'hoodie.upsert.shuffle.parallelism': 68, 
'hoodie.datasource.write.operation': 'upsert', 'hoodie.cleaner.policy': 
'KEEP_LATEST_COMMITS', 'hoodie.cleaner.commits.retained': 2}
   
   combinedConf = {**commonConfig, **partitionDataConfig, **incrementalConfig}
   
   glue_temp_storage = s3_path_hudi
   
   data_frame_DataSource0 = glueContext.create_data_frame.from_catalog(database 
= database_name, table_name = kinesis_table_name, transformation_ctx = 
"DataSource0", additional_options = {"startingPosition": "TRIM_HORIZON", 
"inferSchema": "true"})
   
   def processBatch(data_frame, batchId):
       if (data_frame.count() > 0):
   
           DataSource0 = DynamicFrame.fromDF(data_frame, glueContext, 
"from_data_frame")
           
           your_map = [
               ('eventName', 'string', 'eventName', 'string'),
               ('userIdentity', 'string', 'userIdentity', 'string'),
               ('eventSource', 'string', 'eventSource', 'string'),
               ('tableName', 'string', 'tableName', 'string'),
               ('recordFormat', 'string', 'recordFormat', 'string'),
               ('eventID', 'string', 'eventID', 'string'),
               ('dynamodb.ApproximateCreationDateTime', 'long', 
'ApproximateCreationDateTime', 'long'),
               ('dynamodb.SizeBytes', 'long', 'SizeBytes', 'long'),
               ('dynamodb.NewImage.id.S', 'string', 'id', 'string'),
               ('dynamodb.NewImage.custName.S', 'string', 'custName', 'string'),
               ('dynamodb.NewImage.email.S', 'string', 'email', 'string'),
               ('dynamodb.NewImage.registrationDate.S', 'string', 
'registrationDate', 'string'),
               ('awsRegion', 'string', 'awsRegion', 'string')
           ]
   
           new_df = ApplyMapping.apply(frame = DataSource0, mappings=your_map, 
transformation_ctx = "applymapping1")
           abc = new_df.toDF()
           
           inputDf = 
abc.withColumn('update_ts_dms',abc["registrationDate"]).withColumn('partitionkey',abc["id"].substr(-1,1)).withColumn('partitionkey2',abc["id"].substr(-2,1))
           inputDf = inputDf.filter("eventName !='REMOVE' AND id 
!='{}'".format(abc['id']))
           
           
   
           # glueContext.write_dynamic_frame.from_options(frame = 
DynamicFrame.fromDF(inputDf, glueContext, "inputDf"), connection_type = 
"marketplace.spark", connection_options = combinedConf)
           glueContext.write_dynamic_frame.from_options(frame = 
DynamicFrame.fromDF(inputDf, glueContext, "inputDf"), connection_type = 
"custom.spark", connection_options = combinedConf)
   
   
   glueContext.forEachBatch(frame = data_frame_DataSource0, batch_function = 
processBatch, options = {"windowSize": "10 seconds", "checkpointLocation":  
s3_path_spark})
   
   
   
   job.commit()
   
   ```
   
   Currently doing something like this! any inputs?
   ```
   inputDf = inputDf.filter("eventName !='REMOVE' AND id 
!='{}'".format(abc['id']))
   ```
   But this is removing every record with either ID as the one being inserted 
or with eventName 'remove'


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to