mahesh2247 commented on issue #7688:
URL: https://github.com/apache/hudi/issues/7688#issuecomment-1386892654

   Trying with this code below for deleting (using Glue Job)
   ```
   import sys
   from awsglue.transforms import *
   from awsglue.utils import getResolvedOptions
   from pyspark.sql.session import SparkSession
   from pyspark.context import SparkContext
   from awsglue.context import GlueContext
   from awsglue.job import Job
   from pyspark.sql import DataFrame, Row
   from pyspark.sql.functions import * 
   from pyspark.sql.functions import col, to_timestamp, 
monotonically_increasing_id, to_date, when
   import datetime
   from pyspark.sql import functions as F
   from awsglue import DynamicFrame
   
   import boto3
   
   ## @params: [JOB_NAME]
   args = getResolvedOptions(sys.argv, ["JOB_NAME", "database_name", 
"kinesis_table_name", "starting_position_of_kinesis_iterator", 
"hudi_table_name", "window_size", "s3_path_hudi", "s3_path_spark" ])
   
   spark = 
SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer').config('spark.sql.hive.convertMetastoreParquet','false').getOrCreate()
                       
   sc = spark.sparkContext
   glueContext = GlueContext(sc)
   job = Job(glueContext)
   job.init(args['JOB_NAME'], args)
   
   database_name = args["database_name"]
   kinesis_table_name = args["kinesis_table_name"]
   hudi_table_name = args["hudi_table_name"]
   s3_path_hudi = args["s3_path_hudi"]
   s3_path_spark = args["s3_path_spark"]
   
   commonConfig = {'hoodie.datasource.write.hive_style_partitioning' : 
'true','className' : 'org.apache.hudi', 
'hoodie.datasource.hive_sync.use_jdbc':'false', 
'hoodie.datasource.write.precombine.field': 'id', 
'hoodie.datasource.write.recordkey.field': 'id', 'hoodie.table.name': 
hudi_table_name, 'hoodie.consistency.check.enabled': 'true', 
'hoodie.datasource.hive_sync.database': database_name, 
'hoodie.datasource.hive_sync.table': hudi_table_name, 
'hoodie.datasource.hive_sync.enable': 'true', 'path': s3_path_hudi}
   
   partitionDataConfig = { 'hoodie.datasource.write.keygenerator.class' : 
'org.apache.hudi.keygen.ComplexKeyGenerator', 
'hoodie.datasource.write.partitionpath.field': "partitionkey, partitionkey2 ", 
'hoodie.datasource.hive_sync.partition_extractor_class': 
'org.apache.hudi.hive.MultiPartKeysValueExtractor', 
'hoodie.datasource.hive_sync.partition_fields': "partitionkey, partitionkey2"}
   
   incrementalConfig = {'hoodie.upsert.shuffle.parallelism': 68, 
'hoodie.datasource.write.operation': 'upsert', 'hoodie.cleaner.policy': 
'KEEP_LATEST_COMMITS', 'hoodie.cleaner.commits.retained': 2}
   
   combinedConf = {**commonConfig, **partitionDataConfig, **incrementalConfig}
   
   glue_temp_storage = s3_path_hudi
   
   data_frame_DataSource0 = glueContext.create_data_frame.from_catalog(database 
= database_name, table_name = kinesis_table_name, transformation_ctx = 
"DataSource0", additional_options = {"startingPosition": "TRIM_HORIZON", 
"inferSchema": "true"})
   
   def processBatch(data_frame, batchId):
       if (data_frame.count() > 0):
   
           DataSource0 = DynamicFrame.fromDF(data_frame, glueContext, 
"from_data_frame")
           
           your_map = [
               ('eventName', 'string', 'eventName', 'string'),
               ('userIdentity', 'string', 'userIdentity', 'string'),
               ('eventSource', 'string', 'eventSource', 'string'),
               ('tableName', 'string', 'tableName', 'string'),
               ('recordFormat', 'string', 'recordFormat', 'string'),
               ('eventID', 'string', 'eventID', 'string'),
               ('dynamodb.ApproximateCreationDateTime', 'long', 
'ApproximateCreationDateTime', 'long'),
               ('dynamodb.SizeBytes', 'long', 'SizeBytes', 'long'),
               ('dynamodb.NewImage.id.S', 'string', 'id', 'string'),
               ('dynamodb.NewImage.custName.S', 'string', 'custName', 'string'),
               ('dynamodb.NewImage.email.S', 'string', 'email', 'string'),
               ('dynamodb.NewImage.registrationDate.S', 'string', 
'registrationDate', 'string'),
               ('awsRegion', 'string', 'awsRegion', 'string')
           ]
   
           new_df = ApplyMapping.apply(frame = DataSource0, mappings=your_map, 
transformation_ctx = "applymapping1")
           abc = new_df.toDF()
           
           abc = 
abc.withColumn('update_ts_dms',abc["registrationDate"]).withColumn('partitionkey',abc["id"].substr(-1,1)).withColumn('partitionkey2',abc["id"].substr(-2,1))
   
           ids = abc.where(abc.eventName == 
'REMOVE').groupBy(abc.eventName).agg(F.collect_set(abc.id).alias('ids')).collect()[0]['ids']
   
           inputDf = abc.where(~ abc.id.isin(ids))
           
           
   
           # glueContext.write_dynamic_frame.from_options(frame = 
DynamicFrame.fromDF(inputDf, glueContext, "inputDf"), connection_type = 
"marketplace.spark", connection_options = combinedConf)
           glueContext.write_dynamic_frame.from_options(frame = 
DynamicFrame.fromDF(inputDf, glueContext, "inputDf"), connection_type = 
"custom.spark", connection_options = combinedConf)
   
   
   glueContext.forEachBatch(frame = data_frame_DataSource0, batch_function = 
processBatch, options = {"windowSize": "10 seconds", "checkpointLocation":  
s3_path_spark})
   
   
   
   job.commit()
   ```
   
   Facing this error: error log from cloudwatch
   
   ```
   23/01/18 11:14:09 ERROR ProcessLauncher: Error from Python:Traceback (most 
recent call last):
     File "/tmp/glue_job_script.py", line 81, in <module>
       glueContext.forEachBatch(frame = data_frame_DataSource0, batch_function 
= processBatch, options = {"windowSize": "10 seconds", "checkpointLocation":  
s3_path_spark})
     File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 
678, in forEachBatch
       raise e
     File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 
668, in forEachBatch
       query.start().awaitTermination()
     File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py", 
line 101, in awaitTermination
       return self._jsq.awaitTermination()
     File 
"/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 
1305, in __call__
       answer, self.gateway_client, self.target_id, self.name)
     File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 
117, in deco
       raise converted from None
   pyspark.sql.utils.StreamingQueryException: An exception was raised by the 
Python Proxy. Return Message: Traceback (most recent call last):
     File 
"/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 
2442, in _call_proxy
       return_value = getattr(self.pool[obj_id], method)(*params)
     File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 
196, in call
       raise e
     File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 
193, in call
       self.func(DataFrame(jdf, self.sql_ctx), batch_id)
     File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 
653, in batch_function_with_persist
       batch_function(data_frame, batchId)
     File "/tmp/glue_job_script.py", line 71, in processBatch
       ids = abc.where(abc.eventName == 
'REMOVE').groupBy(abc.eventName).agg(F.collect_set(abc.id).alias('ids')).collect()[0]['ids']
   IndexError: list index out of range
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to