aniketnanna opened a new issue, #7191:
URL: https://github.com/apache/hudi/issues/7191

   ### Highlight of Issues Facing:
   1. Missing Data
   2. DDL changes in Hudi Tables
   3. Upgrade to Newer Version
   
   ### Detailed Description of Issues:
   **1. Missing Data**
       a. For around 20 tables,randomly, few records are missing in comparision 
to the main AWS RDS DB.
          100/200 records out of millions of Data records are not available 
from Athena but Spark SQL shows correct counts
       b. For 1 or 2 tables, only 1 single records got missed out of 170 
millions record
       c. In 1 table 2800 records missed out of 600,000 records.
   
   **2. DDL Changes:**
       a. Add column in existing table and drop column from existing table
       b. Change table/column name
       c. Not able to add or delete column from spark.sql. ALTER TABLE ADD 
COLUMNS
          E.g. spark.sql('alter table db.table_name add columns(check_status 
string)')
   
   **3. Upgrade to newer version:**
       a. Upgrade to newer version of Hudi in AWS Work environment(Cureent Hudi 
Version: 0.10.1) without reprocessing complete Data.
   
   ### Work Requirement :
   1. Complete production data lake has been built with ~8 TB data having 120+ 
Tables
   2. Cannot afford to miss a single record due to the important queries that 
would be running over these tables for business use cases, or if a missing 
record is identified then we could be able to insert them by one off script 
calls
   
   # Details of Work Environment:
   
   ## **Migrating Postgres RDS to S3 datalake using AWS DMS:**
   Postgres Version: 12.8
   1. Migration from Postgres RDS to S3 using Amazon DMS
   **a. Source Endpoint:** Source Engine: Postgres
   **b. Target Endpoint:**  Endpoint engine: Amazon S3-output File 
format:Parquet (Uncompressed)
   **c. Initial Load:**
       For initial load/Full Load based on size of data, Number of output files 
are one or more.
       Format of Output file: parquet (Uncompressed)
       Sizes of output files: Vary between KB to GB based on number of records
   **d. CDC:** 
       CDC record files are written in S3 whenever there is change in data or 
interval of 60s with least interval of 60s in S3 bucket
       Format of Output file: parquet(Uncompressed)
   
   ## **Development Environment:**
   - Creating Hudi tables in AWS Glue Catalog
   - Complete Transformation is divided into two script
   -  Script-1: To handle Initial load, CDC-Inserts of new records, Update 
existing records and Deletes(Creates different catalog table for deleted 
records)
   - Script-2: To handle Delete part of CDC(Delete records with the help 
Delete_catalog table)
   
   ## Script-1 Details:
       **Configurations/Libraries used in the Script:**
           Glue Version: 2.0 supports spark 2.4, scala 2, python 3
           Use Glue data catalog as the Hive metastore
           Hudi Version: 0.10.1
           Dependent JARs path: 
s3://hudi-files-for-gluejob/hudi-spark-bundle_2.11-0.10.1.jar,s3://hudi-files-for-gluejob/spark-avro_2.11-2.4.4.jar
   
           Libraries:
           import sys
           import os
           import json
           from dateutil import parser, tz
           from datetime import datetime
           import math
           from pyspark.context import SparkContext
           from pyspark.sql.session import SparkSession
           from pyspark.sql.functions import concat, col, lit, to_timestamp, 
year, month,lower,ceil,row_number,max,from_utc_timestamp, date_trunc, dayofmonth
           from pyspark.sql.types import BooleanType
           from pyspark.sql import SQLContext
           from pyspark.sql.window import Window
   
           from awsglue.utils import getResolvedOptions
           from awsglue.context import GlueContext
           from awsglue.job import Job
           from awsglue.dynamicframe import DynamicFrame
           from pyspark.sql.types import StringType, TimestampType, IntegerType
           import boto3
           import pytz
           import traceback
           from botocore.exceptions import ClientError
           import requests
           spark = 
SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer').getOrCreate()
           glueContext = GlueContext(spark.sparkContext)
   
       **Flow of the Program:**
       1. Checks whether a table exist in glue catalogue tables.
       2. Get primary key
       4. Create dynamic frame
           inputDyf = 
glueContext.create_dynamic_frame_from_options(connection_type = 's3', 
connection_options = {'paths': [s3_path], 'groupFiles': 'none', 
'recurse':True}, format = 'parquet',transformation_ctx = t_ctx)
       5. Convert dynamic_frame to data_frame
           inputdframe=inputDyf.toDF()
       6. Remove duplicates
           inputdframe = 
inputdframe.withColumn("row_number",row_number().over(Window.partitionBy(inputdframe[primaryKey],inputdframe.update_ts_dms).orderBy(inputdframe[primaryKey],inputdframe.update_ts_dms))).withColumn('max_row_number',
 
max('row_number').over(Window.partitionBy(inputdframe[primaryKey],inputdframe.update_ts_dms))).where(col('row_number')
 == col('max_row_number')).drop('row_number').drop('max_row_number')
       7. Convert all timestamp columns from UTC to IST.
       8. Convert specific columns to Boolean
           During Migration, DMS casting boolean columns to string, converting 
these columns back to boolean
       9. For tables where partitions to be added, creating partition columns 
year, month and day using created_date column
       10. Created Hudi table on Full load/Intial Load
           commonConfig = {'hoodie.datasource.write.storage.type': 
'COPY_ON_WRITE', 'className' : 'org.apache.hudi', 
'hoodie.datasource.hive_sync.use_jdbc':'false', 
'hoodie.datasource.write.precombine.field': 'update_ts_dms', 
'hoodie.datasource.write.recordkey.field': primaryKey, 'hoodie.table.name': 
tableName1, 'hoodie.consistency.check.enabled': 'false', 
'hoodie.datasource.hive_sync.database': database_name, 
'hoodie.datasource.hive_sync.table': tableName1, 
'hoodie.datasource.hive_sync.enable': 'true', 
'hoodie.parquet.max.file.size':125829120, 
'hoodie.parquet.min.file.size':94371840}
           partitionDataConfig = 
{'hoodie.datasource.write.partitionpath.field': partitionKey, 
'hoodie.datasource.hive_sync.partition_extractor_class': 
'org.apache.hudi.hive.MultiPartKeysValueExtractor','hoodie.datasource.write.keygenerator.class':
 'org.apache.hudi.keygen.ComplexKeyGenerator', 
'hoodie.datasource.hive_sync.partition_fields': partitionKey}
           unpartitionDataConfig = 
{'hoodie.datasource.hive_sync.partition_extractor_class': 
'org.apache.hudi.hive.NonPartitionedExtractor', 
'hoodie.datasource.write.keygenerator.class': 
'org.apache.hudi.keygen.NonpartitionedKeyGenerator'}
           initLoadConfig = {'hoodie.bulkinsert.shuffle.parallelism': 
shufflecount, 'hoodie.datasource.write.operation': 'upsert'}
           here shufflecount count = total_size of files/(1048576*500)
           10.1 Hudi configurations used for initial load for partitioned table 
(i.e. when table does not exist)
           combinedConf = {**commonConfig, **partitionDataConfig, 
**initLoadConfig}        
           10.2 Hudi configurations used for initial load for Un-partitioned 
table (i.e. when table does not exist)
           combinedConf = {**commonConfig, **unpartitionDataConfig, 
**initLoadConfig}
           10.3 Writing data frame syntax used for both partitioned and 
unpartitioned tables
               
outputDf.write.format('org.apache.hudi').options(**combinedConf).mode('Overwrite').save(targetPath)
       11. To differntiate between Inserts, Updates and Deletes records for CDC 
added a column with name 'Op'
           for Inserts, 'Op' = 'I'
           for Updates, 'Op' = 'U'
           for Deletes, 'Op' = 'D'
       12. For Inserts and updates i.e. when 'Op' != 'D'. (This is When table 
exists)
             commonConfig = {'hoodie.datasource.write.storage.type': 
'COPY_ON_WRITE', 'className' : 'org.apache.hudi', 
'hoodie.datasource.hive_sync.use_jdbc':'false', 
'hoodie.datasource.write.precombine.field': 'update_ts_dms', 
'hoodie.datasource.write.recordkey.field': primaryKey, 'hoodie.table.name': 
tableName1, 'hoodie.consistency.check.enabled': 'false', 
'hoodie.datasource.hive_sync.database': database_name, 
'hoodie.datasource.hive_sync.table': tableName1, 
'hoodie.datasource.hive_sync.enable': 'true', 
'hoodie.parquet.max.file.size':125829120, 
'hoodie.parquet.min.file.size':94371840}
             partitionDataConfig = 
{'hoodie.datasource.write.partitionpath.field': partitionKey, 
'hoodie.datasource.hive_sync.partition_extractor_class': 
'org.apache.hudi.hive.MultiPartKeysValueExtractor','hoodie.datasource.write.keygenerator.class':
 'org.apache.hudi.keygen.ComplexKeyGenerator', 
'hoodie.datasource.hive_sync.partition_fields': partitionKey}
             unpartitionDataConfig = 
{'hoodie.datasource.hive_sync.partition_extractor_class': 
'org.apache.hudi.hive.NonPartitionedExtractor', 
'hoodie.datasource.write.keygenerator.class': 
'org.apache.hudi.keygen.NonpartitionedKeyGenerator'}
             incrementalConfig = {'hoodie.upsert.shuffle.parallelism': 1, 
'hoodie.datasource.write.operation': 'upsert', 'hoodie.cleaner.policy': 
'KEEP_LATEST_COMMITS', 'hoodie.cleaner.commits.retained': 10}
           Hudi configurations used for CDC partitioned tables
               combinedConf = {**commonConfig, **partitionDataConfig, 
**incrementalConfig}
           Hudi Configuration used for CDC unpartitioned tables
               combinedConf = {**commonConfig, **unpartitionDataConfig, 
**incrementalConfig}
           Writing data frame syntax used for both partitioned and 
unpartitioned tables
               
outputDf.write.format('org.apache.hudi').options(**combinedConf).mode('Overwrite').save(targetPath)
       13. For Deletes i.e. when 'Op' = 'D'
           Hudi configurations used:
           combinedConf = {**commonConfig, **unpartitionDataConfig, 
**initLoadConfig}
           where,
           commonConfig = {'hoodie.datasource.write.storage.type': 
'COPY_ON_WRITE', 'className' : 'org.apache.hudi', 
'hoodie.datasource.hive_sync.use_jdbc':'false', 
'hoodie.datasource.write.precombine.field': 'update_ts_dms', 
'hoodie.datasource.write.recordkey.field': 'id', 'hoodie.table.name': 
tableName1, 'hoodie.consistency.check.enabled': 'false', 
'hoodie.datasource.hive_sync.database': delete_database_name, 
'hoodie.datasource.hive_sync.table': tableName1, 
'hoodie.datasource.hive_sync.enable': 'true', 
'hoodie.parquet.max.file.size':125829120, 
'hoodie.parquet.min.file.size':94371840}
           unpartitionDataConfig = 
{'hoodie.datasource.hive_sync.partition_extractor_class': 
'org.apache.hudi.hive.NonPartitionedExtractor', 
'hoodie.datasource.write.keygenerator.class': 
'org.apache.hudi.keygen.NonpartitionedKeyGenerator'}
           initLoadConfig = {'hoodie.bulkinsert.shuffle.parallelism': 5, 
'hoodie.datasource.write.operation': 'upsert'}
           Writing data frame syntax used:
               
outputDf_deleted.write.format('org.apache.hudi').options(**combinedConf).mode('Overwrite').save(targetPath)
   
   ## Script-2 Details:
       **Configurations/Libraries used in the Script:**
           Glue Version: 2.0 supports spark 2.4, scala 2, python 3
           Use Glue data catalog as the Hive metastore
           Hudi Version: 0.10.1
           Dependent JARs path: 
s3://hudi-files-for-gluejob/hudi-spark-bundle_2.11-0.10.1.jar,s3://hudi-files-for-gluejob/spark-avro_2.11-2.4.4.jar
           Job parameters: key: --conf Value: 
spark.serializer=org.apache.spark.serializer.KryoSerializer --conf 
spark.sql.hive.convertMetastoreParquet=false --conf 
spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension
   
           Libraries:
           import sys
           import json
           import boto3
           import time
           from awsglue.transforms import *
           from awsglue.utils import getResolvedOptions
           from pyspark.context import SparkContext
           from awsglue.context import GlueContext
           from awsglue.job import Job
           from pyspark.sql.session import SparkSession
           from pyspark.sql.functions import *
   
       **Flow of the Program:**
       1. Using spark.sql, queries delete records from delete_catalog created 
in script-1
       2. Hudi Configurations:
           commonConfig = {'hoodie.datasource.write.storage.type': 
'COPY_ON_WRITE', 'className' : 'org.apache.hudi', 
'hoodie.datasource.hive_sync.use_jdbc':'false', 
'hoodie.datasource.write.precombine.field': 'update_ts_dms', 
'hoodie.datasource.write.recordkey.field': 'id', 'hoodie.table.name': 
table_name, 'hoodie.consistency.check.enabled': 'true', 
'hoodie.datasource.hive_sync.database': databasename, 
'hoodie.datasource.hive_sync.table': table_name, 
'hoodie.datasource.hive_sync.enable': 'true', 
'hoodie.parquet.max.file.size':125829120, 
'hoodie.parquet.min.file.size':94371840}
           partitionDataConfig = 
{'hoodie.datasource.write.partitionpath.field': partitionKey, 
'hoodie.datasource.hive_sync.partition_extractor_class': 
'org.apache.hudi.hive.MultiPartKeysValueExtractor','hoodie.datasource.write.keygenerator.class':
 'org.apache.hudi.keygen.ComplexKeyGenerator', 
'hoodie.datasource.hive_sync.partition_fields': partitionKey}
           unpartitionDataConfig = 
{'hoodie.datasource.hive_sync.partition_extractor_class': 
'org.apache.hudi.hive.NonPartitionedExtractor', 
'hoodie.datasource.write.keygenerator.class': 
'org.apache.hudi.keygen.NonpartitionedKeyGenerator'}
           incrementalConfig = {'hoodie.upsert.shuffle.parallelism': 1, 
'hoodie.datasource.write.operation': 'upsert', 'hoodie.cleaner.policy': 
'KEEP_LATEST_COMMITS', 'hoodie.cleaner.commits.retained': 10}
           deleteDataConfig = {'hoodie.datasource.write.payload.class': 
'org.apache.hudi.common.model.EmptyHoodieRecordPayload'}
       3. for partitioned tables:
           combinedConf = {**commonConfig, **partitionDataConfig, 
**incrementalConfig, **deleteDataConfig}
       4. for unpartitioned tables:
           combinedConf = {**commonConfig, **unpartitionDataConfig, 
**incrementalConfig, **deleteDataConfig}
       5. Writing dataframe syntax:
           
outputDf_deleted.write.format('org.apache.hudi').options(**combinedConf).mode('Append').save(targetPath)
       6. Delete records from delete_catalog:
           combinedConf = {**commonConfig, **unpartitionDataConfig, 
**incrementalConfig, **deleteDataConfig}
           
outputDf_deleted.write.format('org.apache.hudi').options(**combinedConf).mode('Append').save(targetPath)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to