[
https://issues.apache.org/jira/browse/HUDI-7143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
loukey_j updated HUDI-7143:
---------------------------
Description:
{code:sql}
sparkSession.sql("CREATE TABLE if not exists hudi_ut_schema_evolution (id INT,
version INT, name STRING, birthDate TIMESTAMP, inc_day STRING) USING HUDI
PARTITIONED BY (inc_day) TBLPROPERTIES (hoodie.table.cdc.enabled='true',
type='cow', primaryKey='id')");
20231127201042503.commit:
sparkSession.sql("merge into hudi_ut_schema_evolution t using ( select 1 as id,
1 as version, 'str_1' as name, cast('2023-01-01 12:12:12.0' as timestamp) as
birthDate, '2023-10-01' as inc_day) s on t.id=s.id when matched THEN UPDATE
SET * WHEN NOT MATCHED THEN INSERT *; ");
20231127201113131.commit:
sparkSession.sql("ALTER TABLE hudi_ut_schema_evolution ADD COLUMNS (add1 String
AFTER id); ");
20231127201124255.commit:
sparkSession.sql("merge into hudi_ut_schema_evolution t using ( select '1' as
add1, 2 as id, 1 as version, 'str_1' as name, cast('2023-01-01 12:12:12.0' as
timestamp) as birthDate, '2023-10-01' as inc_day) s on t.id=s.id when matched
THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT *; ");
20231127201146659.commit:
sparkSession.sql("ALTER TABLE hudi_ut_schema_evolution DROP COLUMN add1");
20231127201157382.commit:
sparkSession.sql("ALTER TABLE hudi_ut_schema_evolution ADD COLUMNS (add1 int)");
20231127201208532.commit:
sparkSession.sql("merge into hudi_ut_schema_evolution t using ( select 1 as
add1, 3 as id, 1 as version, 'str_1' as name, cast('2023-01-01 12:12:12.0' as
timestamp) as birthDate, '2023-10-01' as inc_day) s on t.id=s.id when matched
THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT *; ");
sparkSession.sql("select * from hudi_ut_schema_evolution").show(100, false);
+-------------------+---------------------+------------------+----------------------+-------------------------------------------------------------------------+---+-------+-----+-------------------+----+----------+
|_hoodie_commit_time|_hoodie_commit_seqno
|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name
|id |version|name |birthDate
|add1|inc_day |
+-------------------+---------------------+------------------+----------------------+-------------------------------------------------------------------------+---+-------+-----+-------------------+----+----------+
|20231127201042503 |20231127201042503_0_0|1
|inc_day=2023-10-01
|2fe30d70-daa3-4ebc-8dab-313116e1f8f3-0_0-103-89_20231127201208532.parquet|1
|1 |str_1|2023-01-01 12:12:12|null|2023-10-01|
|20231127201124255 |20231127201124255_0_1|2
|inc_day=2023-10-01
|2fe30d70-daa3-4ebc-8dab-313116e1f8f3-0_0-103-89_20231127201208532.parquet|2
|1 |str_1|2023-01-01 12:12:12|null|2023-10-01|
|20231127201208532 |20231127201208532_0_2|3
|inc_day=2023-10-01
|2fe30d70-daa3-4ebc-8dab-313116e1f8f3-0_0-103-89_20231127201208532.parquet|3
|1 |str_1|2023-01-01 12:12:12|1 |2023-10-01|
+-------------------+---------------------+------------------+----------------------+-------------------------------------------------------------------------+---+-------+-----+-------------------+----+----------+
sparkSession.sql("select * from
hudi_table_changes('hudi_ut_schema_evolution','cdc','20231127201042503','20231127201208532')").show(100,
false);
exception:
org.apache.avro.AvroTypeException: Found string, expecting union
at
org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
at
org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
at
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
at
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at
org.apache.hudi.common.table.log.block.HoodieAvroDataBlock$RecordIterator.next(HoodieAvroDataBlock.java:196)
at
org.apache.hudi.common.table.log.block.HoodieAvroDataBlock$RecordIterator.next(HoodieAvroDataBlock.java:146)
at
org.apache.hudi.common.util.collection.MappingIterator.next(MappingIterator.java:44)
at
org.apache.hudi.common.util.collection.MappingIterator.next(MappingIterator.java:44)
at
org.apache.hudi.common.table.log.HoodieCDCLogRecordIterator.hasNext(HoodieCDCLogRecordIterator.java:77)
at
org.apache.hudi.cdc.HoodieCDCRDD$CDCFileGroupIterator.hasNextInternal(HoodieCDCRDD.scala:266)
at
org.apache.hudi.cdc.HoodieCDCRDD$CDCFileGroupIterator.hasNext(HoodieCDCRDD.scala:275)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:349)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
{code}
was:
{code:sql}
sparkSession.sql("CREATE TABLE if not exists hudi_ut_schema_evolution (id INT,
version INT, name STRING, birthDate TIMESTAMP, inc_day STRING) USING HUDI
PARTITIONED BY (inc_day) TBLPROPERTIES (delta.enableChangeDataFeed='true',
hoodie.table.cdc.enabled='true', type='cow', primaryKey='id')");
20231127201042503.commit:
sparkSession.sql("merge into hudi_ut_schema_evolution t using ( select 1 as id,
1 as version, 'str_1' as name, cast('2023-01-01 12:12:12.0' as timestamp) as
birthDate, '2023-10-01' as inc_day) s on t.id=s.id when matched THEN UPDATE
SET * WHEN NOT MATCHED THEN INSERT *; ");
20231127201113131.commit:
sparkSession.sql("ALTER TABLE hudi_ut_schema_evolution ADD COLUMNS (add1 String
AFTER id); ");
20231127201124255.commit:
sparkSession.sql("merge into hudi_ut_schema_evolution t using ( select '1' as
add1, 2 as id, 1 as version, 'str_1' as name, cast('2023-01-01 12:12:12.0' as
timestamp) as birthDate, '2023-10-01' as inc_day) s on t.id=s.id when matched
THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT *; ");
20231127201146659.commit:
sparkSession.sql("ALTER TABLE hudi_ut_schema_evolution DROP COLUMN add1");
20231127201157382.commit:
sparkSession.sql("ALTER TABLE hudi_ut_schema_evolution ADD COLUMNS (add1 int)");
20231127201208532.commit:
sparkSession.sql("merge into hudi_ut_schema_evolution t using ( select 1 as
add1, 3 as id, 1 as version, 'str_1' as name, cast('2023-01-01 12:12:12.0' as
timestamp) as birthDate, '2023-10-01' as inc_day) s on t.id=s.id when matched
THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT *; ");
sparkSession.sql("select * from hudi_ut_schema_evolution").show(100, false);
+-------------------+---------------------+------------------+----------------------+-------------------------------------------------------------------------+---+-------+-----+-------------------+----+----------+
|_hoodie_commit_time|_hoodie_commit_seqno
|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name
|id |version|name |birthDate
|add1|inc_day |
+-------------------+---------------------+------------------+----------------------+-------------------------------------------------------------------------+---+-------+-----+-------------------+----+----------+
|20231127201042503 |20231127201042503_0_0|1
|inc_day=2023-10-01
|2fe30d70-daa3-4ebc-8dab-313116e1f8f3-0_0-103-89_20231127201208532.parquet|1
|1 |str_1|2023-01-01 12:12:12|null|2023-10-01|
|20231127201124255 |20231127201124255_0_1|2
|inc_day=2023-10-01
|2fe30d70-daa3-4ebc-8dab-313116e1f8f3-0_0-103-89_20231127201208532.parquet|2
|1 |str_1|2023-01-01 12:12:12|null|2023-10-01|
|20231127201208532 |20231127201208532_0_2|3
|inc_day=2023-10-01
|2fe30d70-daa3-4ebc-8dab-313116e1f8f3-0_0-103-89_20231127201208532.parquet|3
|1 |str_1|2023-01-01 12:12:12|1 |2023-10-01|
+-------------------+---------------------+------------------+----------------------+-------------------------------------------------------------------------+---+-------+-----+-------------------+----+----------+
sparkSession.sql("select * from
hudi_table_changes('hudi_ut_schema_evolution','cdc','20231127201042503','20231127201208532')").show(100,
false);
org.apache.avro.AvroTypeException: Found string, expecting union
at
org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
at
org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
at
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
at
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at
org.apache.hudi.common.table.log.block.HoodieAvroDataBlock$RecordIterator.next(HoodieAvroDataBlock.java:196)
at
org.apache.hudi.common.table.log.block.HoodieAvroDataBlock$RecordIterator.next(HoodieAvroDataBlock.java:146)
at
org.apache.hudi.common.util.collection.MappingIterator.next(MappingIterator.java:44)
at
org.apache.hudi.common.util.collection.MappingIterator.next(MappingIterator.java:44)
at
org.apache.hudi.common.table.log.HoodieCDCLogRecordIterator.hasNext(HoodieCDCLogRecordIterator.java:77)
at
org.apache.hudi.cdc.HoodieCDCRDD$CDCFileGroupIterator.hasNextInternal(HoodieCDCRDD.scala:266)
at
org.apache.hudi.cdc.HoodieCDCRDD$CDCFileGroupIterator.hasNext(HoodieCDCRDD.scala:275)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:349)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
{code}
> schema evolution triggers a CDC query exception
> -----------------------------------------------
>
> Key: HUDI-7143
> URL: https://issues.apache.org/jira/browse/HUDI-7143
> Project: Apache Hudi
> Issue Type: Bug
> Components: spark
> Reporter: loukey_j
> Priority: Critical
> Labels: CDC, schema-evolution, spark
>
> {code:sql}
> sparkSession.sql("CREATE TABLE if not exists hudi_ut_schema_evolution (id
> INT, version INT, name STRING, birthDate TIMESTAMP, inc_day STRING) USING
> HUDI PARTITIONED BY (inc_day) TBLPROPERTIES (hoodie.table.cdc.enabled='true',
> type='cow', primaryKey='id')");
> 20231127201042503.commit:
> sparkSession.sql("merge into hudi_ut_schema_evolution t using ( select 1 as
> id, 1 as version, 'str_1' as name, cast('2023-01-01 12:12:12.0' as timestamp)
> as birthDate, '2023-10-01' as inc_day) s on t.id=s.id when matched THEN
> UPDATE SET * WHEN NOT MATCHED THEN INSERT *; ");
> 20231127201113131.commit:
> sparkSession.sql("ALTER TABLE hudi_ut_schema_evolution ADD COLUMNS (add1
> String AFTER id); ");
> 20231127201124255.commit:
> sparkSession.sql("merge into hudi_ut_schema_evolution t using ( select '1' as
> add1, 2 as id, 1 as version, 'str_1' as name, cast('2023-01-01 12:12:12.0' as
> timestamp) as birthDate, '2023-10-01' as inc_day) s on t.id=s.id when
> matched THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT *; ");
> 20231127201146659.commit:
> sparkSession.sql("ALTER TABLE hudi_ut_schema_evolution DROP COLUMN add1");
> 20231127201157382.commit:
> sparkSession.sql("ALTER TABLE hudi_ut_schema_evolution ADD COLUMNS (add1
> int)");
> 20231127201208532.commit:
> sparkSession.sql("merge into hudi_ut_schema_evolution t using ( select 1 as
> add1, 3 as id, 1 as version, 'str_1' as name, cast('2023-01-01 12:12:12.0' as
> timestamp) as birthDate, '2023-10-01' as inc_day) s on t.id=s.id when
> matched THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT *; ");
> sparkSession.sql("select * from hudi_ut_schema_evolution").show(100, false);
> +-------------------+---------------------+------------------+----------------------+-------------------------------------------------------------------------+---+-------+-----+-------------------+----+----------+
> |_hoodie_commit_time|_hoodie_commit_seqno
> |_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name
> |id |version|name |birthDate
> |add1|inc_day |
> +-------------------+---------------------+------------------+----------------------+-------------------------------------------------------------------------+---+-------+-----+-------------------+----+----------+
> |20231127201042503 |20231127201042503_0_0|1
> |inc_day=2023-10-01
> |2fe30d70-daa3-4ebc-8dab-313116e1f8f3-0_0-103-89_20231127201208532.parquet|1
> |1 |str_1|2023-01-01 12:12:12|null|2023-10-01|
> |20231127201124255 |20231127201124255_0_1|2
> |inc_day=2023-10-01
> |2fe30d70-daa3-4ebc-8dab-313116e1f8f3-0_0-103-89_20231127201208532.parquet|2
> |1 |str_1|2023-01-01 12:12:12|null|2023-10-01|
> |20231127201208532 |20231127201208532_0_2|3
> |inc_day=2023-10-01
> |2fe30d70-daa3-4ebc-8dab-313116e1f8f3-0_0-103-89_20231127201208532.parquet|3
> |1 |str_1|2023-01-01 12:12:12|1 |2023-10-01|
> +-------------------+---------------------+------------------+----------------------+-------------------------------------------------------------------------+---+-------+-----+-------------------+----+----------+
> sparkSession.sql("select * from
> hudi_table_changes('hudi_ut_schema_evolution','cdc','20231127201042503','20231127201208532')").show(100,
> false);
> exception:
> org.apache.avro.AvroTypeException: Found string, expecting union
> at
> org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
> at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
> at
> org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275)
> at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
> at
> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
> at
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
> at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
> at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
> at
> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
> at
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
> at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
> at
> org.apache.hudi.common.table.log.block.HoodieAvroDataBlock$RecordIterator.next(HoodieAvroDataBlock.java:196)
> at
> org.apache.hudi.common.table.log.block.HoodieAvroDataBlock$RecordIterator.next(HoodieAvroDataBlock.java:146)
> at
> org.apache.hudi.common.util.collection.MappingIterator.next(MappingIterator.java:44)
> at
> org.apache.hudi.common.util.collection.MappingIterator.next(MappingIterator.java:44)
> at
> org.apache.hudi.common.table.log.HoodieCDCLogRecordIterator.hasNext(HoodieCDCLogRecordIterator.java:77)
> at
> org.apache.hudi.cdc.HoodieCDCRDD$CDCFileGroupIterator.hasNextInternal(HoodieCDCRDD.scala:266)
> at
> org.apache.hudi.cdc.HoodieCDCRDD$CDCFileGroupIterator.hasNext(HoodieCDCRDD.scala:275)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
> Source)
> at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
> at
> org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:349)
> at
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
> at
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> at org.apache.spark.scheduler.Task.run(Task.scala:131)
> at
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:750)
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)