mahesh2247 commented on issue #7688:
URL: https://github.com/apache/hudi/issues/7688#issuecomment-1386892654
Trying with this code below for deleting (using Glue Job)
```
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.sql.session import SparkSession
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import DataFrame, Row
from pyspark.sql.functions import *
from pyspark.sql.functions import col, to_timestamp,
monotonically_increasing_id, to_date, when
import datetime
from pyspark.sql import functions as F
from awsglue import DynamicFrame
import boto3
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ["JOB_NAME", "database_name",
"kinesis_table_name", "starting_position_of_kinesis_iterator",
"hudi_table_name", "window_size", "s3_path_hudi", "s3_path_spark" ])
spark =
SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer').config('spark.sql.hive.convertMetastoreParquet','false').getOrCreate()
sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
database_name = args["database_name"]
kinesis_table_name = args["kinesis_table_name"]
hudi_table_name = args["hudi_table_name"]
s3_path_hudi = args["s3_path_hudi"]
s3_path_spark = args["s3_path_spark"]
commonConfig = {'hoodie.datasource.write.hive_style_partitioning' :
'true','className' : 'org.apache.hudi',
'hoodie.datasource.hive_sync.use_jdbc':'false',
'hoodie.datasource.write.precombine.field': 'id',
'hoodie.datasource.write.recordkey.field': 'id', 'hoodie.table.name':
hudi_table_name, 'hoodie.consistency.check.enabled': 'true',
'hoodie.datasource.hive_sync.database': database_name,
'hoodie.datasource.hive_sync.table': hudi_table_name,
'hoodie.datasource.hive_sync.enable': 'true', 'path': s3_path_hudi}
partitionDataConfig = { 'hoodie.datasource.write.keygenerator.class' :
'org.apache.hudi.keygen.ComplexKeyGenerator',
'hoodie.datasource.write.partitionpath.field': "partitionkey, partitionkey2 ",
'hoodie.datasource.hive_sync.partition_extractor_class':
'org.apache.hudi.hive.MultiPartKeysValueExtractor',
'hoodie.datasource.hive_sync.partition_fields': "partitionkey, partitionkey2"}
incrementalConfig = {'hoodie.upsert.shuffle.parallelism': 68,
'hoodie.datasource.write.operation': 'upsert', 'hoodie.cleaner.policy':
'KEEP_LATEST_COMMITS', 'hoodie.cleaner.commits.retained': 2}
combinedConf = {**commonConfig, **partitionDataConfig, **incrementalConfig}
glue_temp_storage = s3_path_hudi
data_frame_DataSource0 = glueContext.create_data_frame.from_catalog(database
= database_name, table_name = kinesis_table_name, transformation_ctx =
"DataSource0", additional_options = {"startingPosition": "TRIM_HORIZON",
"inferSchema": "true"})
def processBatch(data_frame, batchId):
if (data_frame.count() > 0):
DataSource0 = DynamicFrame.fromDF(data_frame, glueContext,
"from_data_frame")
your_map = [
('eventName', 'string', 'eventName', 'string'),
('userIdentity', 'string', 'userIdentity', 'string'),
('eventSource', 'string', 'eventSource', 'string'),
('tableName', 'string', 'tableName', 'string'),
('recordFormat', 'string', 'recordFormat', 'string'),
('eventID', 'string', 'eventID', 'string'),
('dynamodb.ApproximateCreationDateTime', 'long',
'ApproximateCreationDateTime', 'long'),
('dynamodb.SizeBytes', 'long', 'SizeBytes', 'long'),
('dynamodb.NewImage.id.S', 'string', 'id', 'string'),
('dynamodb.NewImage.custName.S', 'string', 'custName', 'string'),
('dynamodb.NewImage.email.S', 'string', 'email', 'string'),
('dynamodb.NewImage.registrationDate.S', 'string',
'registrationDate', 'string'),
('awsRegion', 'string', 'awsRegion', 'string')
]
new_df = ApplyMapping.apply(frame = DataSource0, mappings=your_map,
transformation_ctx = "applymapping1")
abc = new_df.toDF()
abc =
abc.withColumn('update_ts_dms',abc["registrationDate"]).withColumn('partitionkey',abc["id"].substr(-1,1)).withColumn('partitionkey2',abc["id"].substr(-2,1))
ids = abc.where(abc.eventName ==
'REMOVE').groupBy(abc.eventName).agg(F.collect_set(abc.id).alias('ids')).collect()[0]['ids']
inputDf = abc.where(~ abc.id.isin(ids))
# glueContext.write_dynamic_frame.from_options(frame =
DynamicFrame.fromDF(inputDf, glueContext, "inputDf"), connection_type =
"marketplace.spark", connection_options = combinedConf)
glueContext.write_dynamic_frame.from_options(frame =
DynamicFrame.fromDF(inputDf, glueContext, "inputDf"), connection_type =
"custom.spark", connection_options = combinedConf)
glueContext.forEachBatch(frame = data_frame_DataSource0, batch_function =
processBatch, options = {"windowSize": "10 seconds", "checkpointLocation":
s3_path_spark})
job.commit()
```
Facing this error: error log from cloudwatch
```
23/01/18 11:14:09 ERROR ProcessLauncher: Error from Python:Traceback (most
recent call last):
File "/tmp/glue_job_script.py", line 81, in <module>
glueContext.forEachBatch(frame = data_frame_DataSource0, batch_function
= processBatch, options = {"windowSize": "10 seconds", "checkpointLocation":
s3_path_spark})
File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line
678, in forEachBatch
raise e
File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line
668, in forEachBatch
query.start().awaitTermination()
File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py",
line 101, in awaitTermination
return self._jsq.awaitTermination()
File
"/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line
1305, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line
117, in deco
raise converted from None
pyspark.sql.utils.StreamingQueryException: An exception was raised by the
Python Proxy. Return Message: Traceback (most recent call last):
File
"/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line
2442, in _call_proxy
return_value = getattr(self.pool[obj_id], method)(*params)
File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line
196, in call
raise e
File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line
193, in call
self.func(DataFrame(jdf, self.sql_ctx), batch_id)
File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line
653, in batch_function_with_persist
batch_function(data_frame, batchId)
File "/tmp/glue_job_script.py", line 71, in processBatch
ids = abc.where(abc.eventName ==
'REMOVE').groupBy(abc.eventName).agg(F.collect_set(abc.id).alias('ids')).collect()[0]['ids']
IndexError: list index out of range
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]