[
https://issues.apache.org/jira/browse/HUDI-7360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Aditya Goenka updated HUDI-7360:
--------------------------------
Description:
Github Issue - [https://github.com/apache/hudi/issues/10590]
Reproducible code
```
from typing import Any
from pyspark import Row
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder \
.appName("Hudi Basics") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.jars.packages",
"org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.1") \
.config("spark.sql.extensions",
"org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
.config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
.getOrCreate()
sc = spark.sparkContext
table_name = "hudi_trips_cdc"
base_path = "/tmp/test_issue_10590_4" # Replace for whatever path
quickstart_utils = sc._jvm.org.apache.hudi.QuickstartUtils
dataGen = quickstart_utils.DataGenerator()
inserts =
sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10))
def create_df():
df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
return df
def write_data():
df = create_df()
hudi_options = {
"hoodie.table.name": table_name,
"hoodie.datasource.write.recordkey.field": "uuid",
"hoodie.datasource.write.table.type": "MERGE_ON_READ", # This can be either MoR
or CoW and the error will still happen
"hoodie.datasource.write.partitionpath.field": "partitionpath",
"hoodie.datasource.write.table.name": table_name,
"hoodie.datasource.write.operation": "upsert",
"hoodie.table.cdc.enabled": "true", # This can be left enabled, and won"t
affect anything unless actually queried as CDC
"hoodie.datasource.write.precombine.field": "ts",
"hoodie.upsert.shuffle.parallelism": 2,
"hoodie.insert.shuffle.parallelism": 2
}
df.write.format("hudi") \
.options(**hudi_options) \
.mode("overwrite") \
.save(base_path)
def update_data():
updates = quickstart_utils.convertToStringList(dataGen.generateUpdates(10))
df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df.write \
.format("hudi") \
.mode("append") \
.save(base_path)
def incremental_query():
ordered_rows: list[Row] = spark.read \
.format("hudi") \
.load(base_path) \
.select(col("_hoodie_commit_time").alias("commit_time")) \
.orderBy(col("commit_time")) \
.collect()
commits: list[Any] = list(map(lambda row: row[0], ordered_rows))
begin_time = commits[0]
incremental_read_options = {
'hoodie.datasource.query.incremental.format': "cdc", # Uncomment this line to
Query as CDC, crashes in 0.14.1
'hoodie.datasource.query.type': 'incremental',
'hoodie.datasource.read.begin.instanttime': begin_time,
}
trips_incremental_df = spark.read \
.format("hudi") \
.options(**incremental_read_options) \
.load(base_path)
# Error also occurs when using the "from_hudi_table_changes" in 0.14.1
# sql_query = f""" SELECT * FROM hudi_table_changes ('\{base_path}', 'cdc',
'earliest')"""
# trips_incremental_df = spark.sql(sql_query)
trips_incremental_df.show()
trips_incremental_df.printSchema()
if __name__ == "__main__":
write_data()
update_data()
incremental_query()
```
was:Github Issue - [https://github.com/apache/hudi/issues/10590]
> Incremental CDC Query after 0.14.1 upgrade giving Jackson class
> incompatibility exception
> -----------------------------------------------------------------------------------------
>
> Key: HUDI-7360
> URL: https://issues.apache.org/jira/browse/HUDI-7360
> Project: Apache Hudi
> Issue Type: Bug
> Components: reader-core
> Reporter: Aditya Goenka
> Priority: Critical
> Fix For: 1.1.0
>
>
> Github Issue - [https://github.com/apache/hudi/issues/10590]
> Reproducible code
> ```
> from typing import Any
> from pyspark import Row
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import col
> spark = SparkSession.builder \
> .appName("Hudi Basics") \
> .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
> .config("spark.jars.packages",
> "org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.1") \
> .config("spark.sql.extensions",
> "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
> .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
> .getOrCreate()
> sc = spark.sparkContext
> table_name = "hudi_trips_cdc"
> base_path = "/tmp/test_issue_10590_4" # Replace for whatever path
> quickstart_utils = sc._jvm.org.apache.hudi.QuickstartUtils
> dataGen = quickstart_utils.DataGenerator()
> inserts =
> sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10))
> def create_df():
> df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
> return df
> def write_data():
> df = create_df()
> hudi_options = {
> "hoodie.table.name": table_name,
> "hoodie.datasource.write.recordkey.field": "uuid",
> "hoodie.datasource.write.table.type": "MERGE_ON_READ", # This can be either
> MoR or CoW and the error will still happen
> "hoodie.datasource.write.partitionpath.field": "partitionpath",
> "hoodie.datasource.write.table.name": table_name,
> "hoodie.datasource.write.operation": "upsert",
> "hoodie.table.cdc.enabled": "true", # This can be left enabled, and won"t
> affect anything unless actually queried as CDC
> "hoodie.datasource.write.precombine.field": "ts",
> "hoodie.upsert.shuffle.parallelism": 2,
> "hoodie.insert.shuffle.parallelism": 2
> }
> df.write.format("hudi") \
> .options(**hudi_options) \
> .mode("overwrite") \
> .save(base_path)
> def update_data():
> updates = quickstart_utils.convertToStringList(dataGen.generateUpdates(10))
> df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
> df.write \
> .format("hudi") \
> .mode("append") \
> .save(base_path)
> def incremental_query():
> ordered_rows: list[Row] = spark.read \
> .format("hudi") \
> .load(base_path) \
> .select(col("_hoodie_commit_time").alias("commit_time")) \
> .orderBy(col("commit_time")) \
> .collect()
> commits: list[Any] = list(map(lambda row: row[0], ordered_rows))
> begin_time = commits[0]
> incremental_read_options = {
> 'hoodie.datasource.query.incremental.format': "cdc", # Uncomment this line to
> Query as CDC, crashes in 0.14.1
> 'hoodie.datasource.query.type': 'incremental',
> 'hoodie.datasource.read.begin.instanttime': begin_time,
> }
> trips_incremental_df = spark.read \
> .format("hudi") \
> .options(**incremental_read_options) \
> .load(base_path)
> # Error also occurs when using the "from_hudi_table_changes" in 0.14.1
> # sql_query = f""" SELECT * FROM hudi_table_changes ('\{base_path}', 'cdc',
> 'earliest')"""
> # trips_incremental_df = spark.sql(sql_query)
> trips_incremental_df.show()
> trips_incremental_df.printSchema()
> if __name__ == "__main__":
> write_data()
> update_data()
> incremental_query()
> ```
>
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)