phantomcoder62 commented on issue #8040:
URL: https://github.com/apache/hudi/issues/8040#issuecomment-1448021928
> sample code
>
> ```
> import sys
> from awsglue.transforms import *
> from awsglue.utils import getResolvedOptions
> from pyspark.sql.session import SparkSession
> from pyspark.context import SparkContext
> from awsglue.context import GlueContext
> from awsglue.job import Job
> from pyspark.sql import DataFrame, Row
> from pyspark.sql.functions import *
> from pyspark.sql.functions import col, to_timestamp,
monotonically_increasing_id, to_date, when
> import datetime
> from awsglue import DynamicFrame
>
> import boto3
>
> ## @params: [JOB_NAME]
> args = getResolvedOptions(sys.argv,
> ["JOB_NAME", "database_name",
"kinesis_table_name", "starting_position_of_kinesis_iterator",
> "hudi_table_name", "window_size",
"s3_path_hudi", "s3_path_spark"])
>
> spark = SparkSession.builder.config('spark.serializer',
'org.apache.spark.serializer.KryoSerializer').config(
> 'spark.sql.hive.convertMetastoreParquet', 'false').getOrCreate()
>
> sc = spark.sparkContext
> glueContext = GlueContext(sc)
> job = Job(glueContext)
> job.init(args['JOB_NAME'], args)
>
> database_name = args["database_name"]
> kinesis_table_name = args["kinesis_table_name"]
> hudi_table_name = args["hudi_table_name"]
> s3_path_hudi = args["s3_path_hudi"]
> s3_path_spark = args["s3_path_spark"]
>
> print("***********")
> print(f"""
> database_name {database_name}
> kinesis_table_name = {kinesis_table_name}
> hudi_table_name ={hudi_table_name}
> s3_path_hudi = {s3_path_hudi}
> s3_path_spark = {s3_path_spark}
> """)
> # can be set to "latest", "trim_horizon" or "earliest"
> starting_position_of_kinesis_iterator =
args["starting_position_of_kinesis_iterator"]
>
> # The amount of time to spend processing each batch
> window_size = args["window_size"]
>
> data_frame_DataSource0 = glueContext.create_data_frame.from_catalog(
> database=database_name,
> table_name=kinesis_table_name,
> transformation_ctx="DataSource0",
> additional_options={"inferSchema": "true", "startingPosition":
starting_position_of_kinesis_iterator}
> )
>
> # config
> commonConfig = {
> 'path': s3_path_hudi
> }
>
> hudiWriteConfig = {
> 'className': 'org.apache.hudi',
> 'hoodie.table.name': hudi_table_name,
> 'hoodie.datasource.write.operation': 'upsert',
> 'hoodie.datasource.write.table.type': 'COPY_ON_WRITE',
> 'hoodie.datasource.write.precombine.field': 'date',
> 'hoodie.datasource.write.recordkey.field': '_id',
> 'hoodie.datasource.write.partitionpath.field':
'year:SIMPLE,month:SIMPLE,day:SIMPLE',
> 'hoodie.datasource.write.keygenerator.class':
'org.apache.hudi.keygen.CustomKeyGenerator',
> 'hoodie.deltastreamer.keygen.timebased.timestamp.type': 'MIXED',
> 'hoodie.deltastreamer.keygen.timebased.input.dateformat': 'yyyy-mm-dd',
> 'hoodie.deltastreamer.keygen.timebased.output.dateformat': 'yyyy/MM/dd'
> }
>
> hudiGlueConfig = {
> 'hoodie.datasource.hive_sync.enable': 'true',
> 'hoodie.datasource.hive_sync.sync_as_datasource': 'false',
> 'hoodie.datasource.hive_sync.database': database_name,
> 'hoodie.datasource.hive_sync.table': hudi_table_name,
> 'hoodie.datasource.hive_sync.use_jdbc': 'false',
> 'hoodie.datasource.write.hive_style_partitioning': 'true',
> 'hoodie.datasource.hive_sync.partition_extractor_class':
'org.apache.hudi.hive.MultiPartKeysValueExtractor',
> 'hoodie.datasource.hive_sync.partition_fields': 'year,month,day'
> }
>
> combinedConf = {
> **commonConfig,
> **hudiWriteConfig,
> **hudiGlueConfig
> }
>
>
> # ensure the incomong record has the correct current schema, new fresh
columns are fine, if a column exists in current schema but not in incoming
record then manually add before inserting
> def evolveSchema(kinesis_df, table, forcecast=False):
> try:
> # get existing table's schema
> glue_catalog_df = spark.sql("SELECT * FROM " + table + " LIMIT 0")
> # sanitize for hudi specific system columns
> columns_to_drop = ['_hoodie_commit_time', '_hoodie_commit_seqno',
'_hoodie_record_key',
> '_hoodie_partition_path', '_hoodie_file_name']
> glue_catalog_df_sanitized = glue_catalog_df.drop(*columns_to_drop)
> if (kinesis_df.schema != glue_catalog_df_sanitized.schema):
> merged_df = kinesis_df.unionByName(glue_catalog_df_sanitized,
allowMissingColumns=True)
> return (merged_df)
> except Exception as e:
> print(e)
> return (kinesis_df)
>
>
> def processBatch(data_frame, batchId):
> if (data_frame.count() > 0):
> kinesis_dynamic_frame = DynamicFrame.fromDF(data_frame,
glueContext, "from_kinesis_data_frame")
> kinesis_data_frame = kinesis_dynamic_frame.toDF()
>
> kinesis_data_frame = evolveSchema(kinesis_data_frame,
database_name + '.' + hudi_table_name, False)
>
> glueContext.write_dynamic_frame.from_options(
> frame=DynamicFrame.fromDF(kinesis_data_frame, glueContext,
"evolved_kinesis_data_frame"),
> connection_type="custom.spark",
> connection_options=combinedConf
> )
>
>
> glueContext.forEachBatch(
> frame=data_frame_DataSource0,
> batch_function=processBatch,
> options={
> "windowSize": window_size,
> "checkpointLocation": s3_path_spark
> }
> )
>
> job.commit()
> ```
Soumil this solution is **incorrect** for our problem and absolutely
**useless** for our use case as the logic here is for COW HUDI table and will
not work for **MOR tables** as the compaction in MOR works differently. You are
merging old data in HUDI and creating a new dataframe without the deleted
column and pushing in COW HUDI table here and it will work for COW as in COW
new parquets are created. But in MOR, a .log file is created in every iteration
and during compaction the old data is merged and parquet is created. Still we
tested the code you gave us and as expected, we are getting the same error once
compaction happens.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]