aniketnanna opened a new issue, #7191:
URL: https://github.com/apache/hudi/issues/7191
### Highlight of Issues Facing:
1. Missing Data
2. DDL changes in Hudi Tables
3. Upgrade to Newer Version
### Detailed Description of Issues:
**1. Missing Data**
a. For around 20 tables,randomly, few records are missing in comparision
to the main AWS RDS DB.
100/200 records out of millions of Data records are not available
from Athena but Spark SQL shows correct counts
b. For 1 or 2 tables, only 1 single records got missed out of 170
millions record
c. In 1 table 2800 records missed out of 600,000 records.
**2. DDL Changes:**
a. Add column in existing table and drop column from existing table
b. Change table/column name
c. Not able to add or delete column from spark.sql. ALTER TABLE ADD
COLUMNS
E.g. spark.sql('alter table db.table_name add columns(check_status
string)')
**3. Upgrade to newer version:**
a. Upgrade to newer version of Hudi in AWS Work environment(Cureent Hudi
Version: 0.10.1) without reprocessing complete Data.
### Work Requirement :
1. Complete production data lake has been built with ~8 TB data having 120+
Tables
2. Cannot afford to miss a single record due to the important queries that
would be running over these tables for business use cases, or if a missing
record is identified then we could be able to insert them by one off script
calls
# Details of Work Environment:
## **Migrating Postgres RDS to S3 datalake using AWS DMS:**
Postgres Version: 12.8
1. Migration from Postgres RDS to S3 using Amazon DMS
**a. Source Endpoint:** Source Engine: Postgres
**b. Target Endpoint:** Endpoint engine: Amazon S3-output File
format:Parquet (Uncompressed)
**c. Initial Load:**
For initial load/Full Load based on size of data, Number of output files
are one or more.
Format of Output file: parquet (Uncompressed)
Sizes of output files: Vary between KB to GB based on number of records
**d. CDC:**
CDC record files are written in S3 whenever there is change in data or
interval of 60s with least interval of 60s in S3 bucket
Format of Output file: parquet(Uncompressed)
## **Development Environment:**
- Creating Hudi tables in AWS Glue Catalog
- Complete Transformation is divided into two script
- Script-1: To handle Initial load, CDC-Inserts of new records, Update
existing records and Deletes(Creates different catalog table for deleted
records)
- Script-2: To handle Delete part of CDC(Delete records with the help
Delete_catalog table)
## Script-1 Details:
**Configurations/Libraries used in the Script:**
Glue Version: 2.0 supports spark 2.4, scala 2, python 3
Use Glue data catalog as the Hive metastore
Hudi Version: 0.10.1
Dependent JARs path:
s3://hudi-files-for-gluejob/hudi-spark-bundle_2.11-0.10.1.jar,s3://hudi-files-for-gluejob/spark-avro_2.11-2.4.4.jar
Libraries:
import sys
import os
import json
from dateutil import parser, tz
from datetime import datetime
import math
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import concat, col, lit, to_timestamp,
year, month,lower,ceil,row_number,max,from_utc_timestamp, date_trunc, dayofmonth
from pyspark.sql.types import BooleanType
from pyspark.sql import SQLContext
from pyspark.sql.window import Window
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.types import StringType, TimestampType, IntegerType
import boto3
import pytz
import traceback
from botocore.exceptions import ClientError
import requests
spark =
SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer').getOrCreate()
glueContext = GlueContext(spark.sparkContext)
**Flow of the Program:**
1. Checks whether a table exist in glue catalogue tables.
2. Get primary key
4. Create dynamic frame
inputDyf =
glueContext.create_dynamic_frame_from_options(connection_type = 's3',
connection_options = {'paths': [s3_path], 'groupFiles': 'none',
'recurse':True}, format = 'parquet',transformation_ctx = t_ctx)
5. Convert dynamic_frame to data_frame
inputdframe=inputDyf.toDF()
6. Remove duplicates
inputdframe =
inputdframe.withColumn("row_number",row_number().over(Window.partitionBy(inputdframe[primaryKey],inputdframe.update_ts_dms).orderBy(inputdframe[primaryKey],inputdframe.update_ts_dms))).withColumn('max_row_number',
max('row_number').over(Window.partitionBy(inputdframe[primaryKey],inputdframe.update_ts_dms))).where(col('row_number')
== col('max_row_number')).drop('row_number').drop('max_row_number')
7. Convert all timestamp columns from UTC to IST.
8. Convert specific columns to Boolean
During Migration, DMS casting boolean columns to string, converting
these columns back to boolean
9. For tables where partitions to be added, creating partition columns
year, month and day using created_date column
10. Created Hudi table on Full load/Intial Load
commonConfig = {'hoodie.datasource.write.storage.type':
'COPY_ON_WRITE', 'className' : 'org.apache.hudi',
'hoodie.datasource.hive_sync.use_jdbc':'false',
'hoodie.datasource.write.precombine.field': 'update_ts_dms',
'hoodie.datasource.write.recordkey.field': primaryKey, 'hoodie.table.name':
tableName1, 'hoodie.consistency.check.enabled': 'false',
'hoodie.datasource.hive_sync.database': database_name,
'hoodie.datasource.hive_sync.table': tableName1,
'hoodie.datasource.hive_sync.enable': 'true',
'hoodie.parquet.max.file.size':125829120,
'hoodie.parquet.min.file.size':94371840}
partitionDataConfig =
{'hoodie.datasource.write.partitionpath.field': partitionKey,
'hoodie.datasource.hive_sync.partition_extractor_class':
'org.apache.hudi.hive.MultiPartKeysValueExtractor','hoodie.datasource.write.keygenerator.class':
'org.apache.hudi.keygen.ComplexKeyGenerator',
'hoodie.datasource.hive_sync.partition_fields': partitionKey}
unpartitionDataConfig =
{'hoodie.datasource.hive_sync.partition_extractor_class':
'org.apache.hudi.hive.NonPartitionedExtractor',
'hoodie.datasource.write.keygenerator.class':
'org.apache.hudi.keygen.NonpartitionedKeyGenerator'}
initLoadConfig = {'hoodie.bulkinsert.shuffle.parallelism':
shufflecount, 'hoodie.datasource.write.operation': 'upsert'}
here shufflecount count = total_size of files/(1048576*500)
10.1 Hudi configurations used for initial load for partitioned table
(i.e. when table does not exist)
combinedConf = {**commonConfig, **partitionDataConfig,
**initLoadConfig}
10.2 Hudi configurations used for initial load for Un-partitioned
table (i.e. when table does not exist)
combinedConf = {**commonConfig, **unpartitionDataConfig,
**initLoadConfig}
10.3 Writing data frame syntax used for both partitioned and
unpartitioned tables
outputDf.write.format('org.apache.hudi').options(**combinedConf).mode('Overwrite').save(targetPath)
11. To differntiate between Inserts, Updates and Deletes records for CDC
added a column with name 'Op'
for Inserts, 'Op' = 'I'
for Updates, 'Op' = 'U'
for Deletes, 'Op' = 'D'
12. For Inserts and updates i.e. when 'Op' != 'D'. (This is When table
exists)
commonConfig = {'hoodie.datasource.write.storage.type':
'COPY_ON_WRITE', 'className' : 'org.apache.hudi',
'hoodie.datasource.hive_sync.use_jdbc':'false',
'hoodie.datasource.write.precombine.field': 'update_ts_dms',
'hoodie.datasource.write.recordkey.field': primaryKey, 'hoodie.table.name':
tableName1, 'hoodie.consistency.check.enabled': 'false',
'hoodie.datasource.hive_sync.database': database_name,
'hoodie.datasource.hive_sync.table': tableName1,
'hoodie.datasource.hive_sync.enable': 'true',
'hoodie.parquet.max.file.size':125829120,
'hoodie.parquet.min.file.size':94371840}
partitionDataConfig =
{'hoodie.datasource.write.partitionpath.field': partitionKey,
'hoodie.datasource.hive_sync.partition_extractor_class':
'org.apache.hudi.hive.MultiPartKeysValueExtractor','hoodie.datasource.write.keygenerator.class':
'org.apache.hudi.keygen.ComplexKeyGenerator',
'hoodie.datasource.hive_sync.partition_fields': partitionKey}
unpartitionDataConfig =
{'hoodie.datasource.hive_sync.partition_extractor_class':
'org.apache.hudi.hive.NonPartitionedExtractor',
'hoodie.datasource.write.keygenerator.class':
'org.apache.hudi.keygen.NonpartitionedKeyGenerator'}
incrementalConfig = {'hoodie.upsert.shuffle.parallelism': 1,
'hoodie.datasource.write.operation': 'upsert', 'hoodie.cleaner.policy':
'KEEP_LATEST_COMMITS', 'hoodie.cleaner.commits.retained': 10}
Hudi configurations used for CDC partitioned tables
combinedConf = {**commonConfig, **partitionDataConfig,
**incrementalConfig}
Hudi Configuration used for CDC unpartitioned tables
combinedConf = {**commonConfig, **unpartitionDataConfig,
**incrementalConfig}
Writing data frame syntax used for both partitioned and
unpartitioned tables
outputDf.write.format('org.apache.hudi').options(**combinedConf).mode('Overwrite').save(targetPath)
13. For Deletes i.e. when 'Op' = 'D'
Hudi configurations used:
combinedConf = {**commonConfig, **unpartitionDataConfig,
**initLoadConfig}
where,
commonConfig = {'hoodie.datasource.write.storage.type':
'COPY_ON_WRITE', 'className' : 'org.apache.hudi',
'hoodie.datasource.hive_sync.use_jdbc':'false',
'hoodie.datasource.write.precombine.field': 'update_ts_dms',
'hoodie.datasource.write.recordkey.field': 'id', 'hoodie.table.name':
tableName1, 'hoodie.consistency.check.enabled': 'false',
'hoodie.datasource.hive_sync.database': delete_database_name,
'hoodie.datasource.hive_sync.table': tableName1,
'hoodie.datasource.hive_sync.enable': 'true',
'hoodie.parquet.max.file.size':125829120,
'hoodie.parquet.min.file.size':94371840}
unpartitionDataConfig =
{'hoodie.datasource.hive_sync.partition_extractor_class':
'org.apache.hudi.hive.NonPartitionedExtractor',
'hoodie.datasource.write.keygenerator.class':
'org.apache.hudi.keygen.NonpartitionedKeyGenerator'}
initLoadConfig = {'hoodie.bulkinsert.shuffle.parallelism': 5,
'hoodie.datasource.write.operation': 'upsert'}
Writing data frame syntax used:
outputDf_deleted.write.format('org.apache.hudi').options(**combinedConf).mode('Overwrite').save(targetPath)
## Script-2 Details:
**Configurations/Libraries used in the Script:**
Glue Version: 2.0 supports spark 2.4, scala 2, python 3
Use Glue data catalog as the Hive metastore
Hudi Version: 0.10.1
Dependent JARs path:
s3://hudi-files-for-gluejob/hudi-spark-bundle_2.11-0.10.1.jar,s3://hudi-files-for-gluejob/spark-avro_2.11-2.4.4.jar
Job parameters: key: --conf Value:
spark.serializer=org.apache.spark.serializer.KryoSerializer --conf
spark.sql.hive.convertMetastoreParquet=false --conf
spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension
Libraries:
import sys
import json
import boto3
import time
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import *
**Flow of the Program:**
1. Using spark.sql, queries delete records from delete_catalog created
in script-1
2. Hudi Configurations:
commonConfig = {'hoodie.datasource.write.storage.type':
'COPY_ON_WRITE', 'className' : 'org.apache.hudi',
'hoodie.datasource.hive_sync.use_jdbc':'false',
'hoodie.datasource.write.precombine.field': 'update_ts_dms',
'hoodie.datasource.write.recordkey.field': 'id', 'hoodie.table.name':
table_name, 'hoodie.consistency.check.enabled': 'true',
'hoodie.datasource.hive_sync.database': databasename,
'hoodie.datasource.hive_sync.table': table_name,
'hoodie.datasource.hive_sync.enable': 'true',
'hoodie.parquet.max.file.size':125829120,
'hoodie.parquet.min.file.size':94371840}
partitionDataConfig =
{'hoodie.datasource.write.partitionpath.field': partitionKey,
'hoodie.datasource.hive_sync.partition_extractor_class':
'org.apache.hudi.hive.MultiPartKeysValueExtractor','hoodie.datasource.write.keygenerator.class':
'org.apache.hudi.keygen.ComplexKeyGenerator',
'hoodie.datasource.hive_sync.partition_fields': partitionKey}
unpartitionDataConfig =
{'hoodie.datasource.hive_sync.partition_extractor_class':
'org.apache.hudi.hive.NonPartitionedExtractor',
'hoodie.datasource.write.keygenerator.class':
'org.apache.hudi.keygen.NonpartitionedKeyGenerator'}
incrementalConfig = {'hoodie.upsert.shuffle.parallelism': 1,
'hoodie.datasource.write.operation': 'upsert', 'hoodie.cleaner.policy':
'KEEP_LATEST_COMMITS', 'hoodie.cleaner.commits.retained': 10}
deleteDataConfig = {'hoodie.datasource.write.payload.class':
'org.apache.hudi.common.model.EmptyHoodieRecordPayload'}
3. for partitioned tables:
combinedConf = {**commonConfig, **partitionDataConfig,
**incrementalConfig, **deleteDataConfig}
4. for unpartitioned tables:
combinedConf = {**commonConfig, **unpartitionDataConfig,
**incrementalConfig, **deleteDataConfig}
5. Writing dataframe syntax:
outputDf_deleted.write.format('org.apache.hudi').options(**combinedConf).mode('Append').save(targetPath)
6. Delete records from delete_catalog:
combinedConf = {**commonConfig, **unpartitionDataConfig,
**incrementalConfig, **deleteDataConfig}
outputDf_deleted.write.format('org.apache.hudi').options(**combinedConf).mode('Append').save(targetPath)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]