mahesh2247 commented on issue #7688:
URL: https://github.com/apache/hudi/issues/7688#issuecomment-1386588309
Hey danny0405 and umehrot2. Thanks for your reply. I realised that I needed
to add a logic to delete incoming data streams with a "REMOVE" label in it but
do not know how to implement this delete records logic as part of glue jobs for
apache hudi in my script.
```
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.sql.session import SparkSession
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import DataFrame, Row
from pyspark.sql.functions import *
from pyspark.sql.functions import col, to_timestamp,
monotonically_increasing_id, to_date, when
import datetime
from awsglue import DynamicFrame
import boto3
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ["JOB_NAME", "database_name",
"kinesis_table_name", "starting_position_of_kinesis_iterator",
"hudi_table_name", "window_size", "s3_path_hudi", "s3_path_spark" ])
spark =
SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer').config('spark.sql.hive.convertMetastoreParquet','false').getOrCreate()
sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
database_name = args["database_name"]
kinesis_table_name = args["kinesis_table_name"]
hudi_table_name = args["hudi_table_name"]
s3_path_hudi = args["s3_path_hudi"]
s3_path_spark = args["s3_path_spark"]
commonConfig = {'hoodie.datasource.write.hive_style_partitioning' :
'true','className' : 'org.apache.hudi',
'hoodie.datasource.hive_sync.use_jdbc':'false',
'hoodie.datasource.write.precombine.field': 'id',
'hoodie.datasource.write.recordkey.field': 'id', 'hoodie.table.name':
hudi_table_name, 'hoodie.consistency.check.enabled': 'true',
'hoodie.datasource.hive_sync.database': database_name,
'hoodie.datasource.hive_sync.table': hudi_table_name,
'hoodie.datasource.hive_sync.enable': 'true', 'path': s3_path_hudi}
partitionDataConfig = { 'hoodie.datasource.write.keygenerator.class' :
'org.apache.hudi.keygen.ComplexKeyGenerator',
'hoodie.datasource.write.partitionpath.field': "partitionkey, partitionkey2 ",
'hoodie.datasource.hive_sync.partition_extractor_class':
'org.apache.hudi.hive.MultiPartKeysValueExtractor',
'hoodie.datasource.hive_sync.partition_fields': "partitionkey, partitionkey2"}
incrementalConfig = {'hoodie.upsert.shuffle.parallelism': 68,
'hoodie.datasource.write.operation': 'upsert', 'hoodie.cleaner.policy':
'KEEP_LATEST_COMMITS', 'hoodie.cleaner.commits.retained': 2}
combinedConf = {**commonConfig, **partitionDataConfig, **incrementalConfig}
glue_temp_storage = s3_path_hudi
data_frame_DataSource0 = glueContext.create_data_frame.from_catalog(database
= database_name, table_name = kinesis_table_name, transformation_ctx =
"DataSource0", additional_options = {"startingPosition": "TRIM_HORIZON",
"inferSchema": "true"})
def processBatch(data_frame, batchId):
if (data_frame.count() > 0):
DataSource0 = DynamicFrame.fromDF(data_frame, glueContext,
"from_data_frame")
your_map = [
('eventName', 'string', 'eventName', 'string'),
('userIdentity', 'string', 'userIdentity', 'string'),
('eventSource', 'string', 'eventSource', 'string'),
('tableName', 'string', 'tableName', 'string'),
('recordFormat', 'string', 'recordFormat', 'string'),
('eventID', 'string', 'eventID', 'string'),
('dynamodb.ApproximateCreationDateTime', 'long',
'ApproximateCreationDateTime', 'long'),
('dynamodb.SizeBytes', 'long', 'SizeBytes', 'long'),
('dynamodb.NewImage.id.S', 'string', 'id', 'string'),
('dynamodb.NewImage.custName.S', 'string', 'custName', 'string'),
('dynamodb.NewImage.email.S', 'string', 'email', 'string'),
('dynamodb.NewImage.registrationDate.S', 'string',
'registrationDate', 'string'),
('awsRegion', 'string', 'awsRegion', 'string')
]
new_df = ApplyMapping.apply(frame = DataSource0, mappings=your_map,
transformation_ctx = "applymapping1")
abc = new_df.toDF()
inputDf =
abc.withColumn('update_ts_dms',abc["registrationDate"]).withColumn('partitionkey',abc["id"].substr(-1,1)).withColumn('partitionkey2',abc["id"].substr(-2,1))
inputDf = inputDf.filter("eventName !='REMOVE' AND id
!='{}'".format(abc['id']))
# glueContext.write_dynamic_frame.from_options(frame =
DynamicFrame.fromDF(inputDf, glueContext, "inputDf"), connection_type =
"marketplace.spark", connection_options = combinedConf)
glueContext.write_dynamic_frame.from_options(frame =
DynamicFrame.fromDF(inputDf, glueContext, "inputDf"), connection_type =
"custom.spark", connection_options = combinedConf)
glueContext.forEachBatch(frame = data_frame_DataSource0, batch_function =
processBatch, options = {"windowSize": "10 seconds", "checkpointLocation":
s3_path_spark})
job.commit()
```
Currently doing something like this! any inputs?
```
inputDf = inputDf.filter("eventName !='REMOVE' AND id
!='{}'".format(abc['id']))
```
But this is removing every record with either ID as the one being inserted
or with eventName 'remove'
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]