soumilshah1995 opened a new issue, #8033: URL: https://github.com/apache/hudi/issues/8033
i am trying to learn new feature hudi has released in RFC https://github.com/apache/hudi/blob/master/rfc/rfc-51/rfc-51.md ### Sample Code ``` try: import os import sys import uuid import pyspark from pyspark.sql import SparkSession from pyspark import SparkConf, SparkContext from pyspark.sql.functions import col, asc, desc from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when from pyspark.sql.functions import * from pyspark.sql.types import * from datetime import datetime from functools import reduce from faker import Faker except Exception as e: pass SUBMIT_ARGS = "--packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13 pyspark-shell" os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS os.environ['PYSPARK_PYTHON'] = sys.executable os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable spark = SparkSession.builder \ .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \ .config('className', 'org.apache.hudi') \ .config('spark.sql.hive.convertMetastoreParquet', 'false') \ .getOrCreate() db_name = "hudidb" table_name = "hudi_cdc_table" recordkey = 'uuid' precombine = 'date' path = f"file:///C:/tmp/{db_name}/{table_name}" method = 'upsert' table_type = "COPY_ON_WRITE" # COPY_ON_WRITE | MERGE_ON_READ hudi_options = { 'hoodie.table.name': table_name, 'hoodie.datasource.write.recordkey.field': recordkey, 'hoodie.datasource.write.table.name': table_name, 'hoodie.datasource.write.operation': method, 'hoodie.datasource.write.precombine.field': precombine, 'hoodie.table.cdc.enabled':'true', 'hoodie.table.cdc.supplemental.logging.mode': 'DATA_AFTER', } data_items = [ (1, "insert 1", 111, "2020-01-06 12:12:12"), (2, "insert 2", 22, "2020-01-06 12:12:12"), ] columns = ["uuid", "message", "precomb", "date"] spark_df = spark.createDataFrame(data=data_items, schema=columns) spark_df.write.format("hudi"). \ options(**hudi_options). \ mode("append"). \ save(path) data_items = [ (3, "insert 1", 111, "2020-01-06 12:12:12"), (4, "insert 2", 22, "2020-01-06 12:12:12"), ] columns = ["uuid", "message", "precomb", "date"] spark_df = spark.createDataFrame(data=data_items, schema=columns) spark_df.write.format("hudi"). \ options(**hudi_options). \ mode("append"). \ save(path) # ========================CDC============================== spark. \ read. \ format("hudi"). \ load(path). \ createOrReplaceTempView("hudi_snapshot") commits = list(map(lambda row: row[0], spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_snapshot order by commitTime").limit(50).collect())) beginTime = commits[len(commits) - 2] # commit time we are interested in print(f"commits : {commits} beginTime : {beginTime} ") print("beginTime", beginTime) incremental_read_options = { 'hoodie.datasource.query.type': 'incremental', 'hoodie.datasource.read.begin.instanttime': beginTime, 'hoodie.datasource.query.incremental.forma':'cdc', 'hoodie.datasource.read.begin.instanttime': beginTime, 'hoodie.datasource.read.end.instanttime':"20230223194341503" } IncrementalDF = spark.read.format("hudi"). \ options(**incremental_read_options). \ load(path) IncrementalDF.createOrReplaceTempView("hudi_incremental") spark.sql("select * from hudi_incremental").show() ``` * This features is just announced and i am trying to learn how exactly it works so i can teach community and pass it on to other via YouTube channel -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
