chenxusheng created HUDI-7509:
---------------------------------
Summary: CDC functionality not available: CDC write and CDC query
Key: HUDI-7509
URL: https://issues.apache.org/jira/browse/HUDI-7509
Project: Apache Hudi
Issue Type: Bug
Reporter: chenxusheng
Attachments: image-2024-03-17-16-16-40-364.png
Environment: spark3.2 hudi 0.14
- Create a table and enable CDC
create table t1(id int,name string)
using hudi
tblProperties(
"hoodie.table.cdc.enabled"="true",
'hoodie.table.cdc.supplemental.logging.mode' = 'DATA_BEFORE_AFTER'
);
-- First insert
insert into t1 SELECT 1,'a';
-- First query
SELECT * FROM hudi_table_changes('t1', 'cdc', 'earliest');
-- First query result
op ts_ms before after
i 20240317160656619 NULL
\{"_hoodie_record_key":"20240317160656619_0_0","_hoodie_partition_path":"","_hoodie_file_name":"9e41b4db-fc19-4d5c-a4d5-d71902210806-0_0-13-12_20240317160656619.parquet","_hoodie_commit_seqno":"20240317160656619_0_0","name":"a","_hoodie_commit_time":"20240317160656619","id":1}
!image-2024-03-17-16-16-40-364.png!
-- second insert
insert into t1 SELECT 2,'b';
-- second Query
SELECT * FROM hudi_table_changes('t1', 'cdc', 'earliest');
-- second Query result
org.apache.hudi.exception.HoodieException: {color:#FF0000}There should be a cdc
log file.{color}
at
org.apache.hudi.common.table.cdc.HoodieCDCExtractor.parseWriteStat(HoodieCDCExtractor.java:273)
at
org.apache.hudi.common.table.cdc.HoodieCDCExtractor.lambda$extractCDCFileSplits$1(HoodieCDCExtractor.java:129)
at java.util.ArrayList.forEach(ArrayList.java:1259)
at
org.apache.hudi.common.table.cdc.HoodieCDCExtractor.extractCDCFileSplits(HoodieCDCExtractor.java:124)
at org.apache.hudi.cdc.CDCRelation.buildScan0(CDCRelation.scala:105)
at org.apache.hudi.cdc.CDCRelation.buildScan(CDCRelation.scala:87)
at
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$apply$4(DataSourceStrategy.scala:323)
at
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$pruneFilterProject$1(DataSourceStrategy.scala:357)
at
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:413)
at
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:356)
at
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:323)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:67)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:67)
at
org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:453)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:144)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:183)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at
org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:183)
at
org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:144)
at
org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:137)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:157)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:183)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at
org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:183)
at
org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:157)
at
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:150)
at
org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:201)
at
org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:246)
at
org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:215)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:98)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:69)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:384)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1(SparkSQLCLIDriver.scala:504)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1$adapted(SparkSQLCLIDriver.scala:498)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:498)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:287)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:955)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
org.apache.hudi.exception.HoodieException: There should be a cdc log file.
at
org.apache.hudi.common.table.cdc.HoodieCDCExtractor.parseWriteStat(HoodieCDCExtractor.java:273)
at
org.apache.hudi.common.table.cdc.HoodieCDCExtractor.lambda$extractCDCFileSplits$1(HoodieCDCExtractor.java:129)
at java.util.ArrayList.forEach(ArrayList.java:1259)
at
org.apache.hudi.common.table.cdc.HoodieCDCExtractor.extractCDCFileSplits(HoodieCDCExtractor.java:124)
at org.apache.hudi.cdc.CDCRelation.buildScan0(CDCRelation.scala:105)
at org.apache.hudi.cdc.CDCRelation.buildScan(CDCRelation.scala:87)
at
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$apply$4(DataSourceStrategy.scala:323)
at
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$pruneFilterProject$1(DataSourceStrategy.scala:357)
at
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:413)
at
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:356)
at
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:323)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:67)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:67)
at
org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:453)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:144)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:183)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at
org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:183)
at
org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:144)
at
org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:137)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:157)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:183)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at
org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:183)
at
org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:157)
at
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:150)
at
org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:201)
at
org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:246)
at
org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:215)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:98)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:69)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:384)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1(SparkSQLCLIDriver.scala:504)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1$adapted(SparkSQLCLIDriver.scala:498)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:498)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:287)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:955)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)