Sam-Serpoosh commented on issue #8761:
URL: https://github.com/apache/hudi/issues/8761#issuecomment-1639008704
@ad1happy2go Thank you very for your patience here, appreciate it. I have
some updates on this ...
### Managed To Get The Data Flow Going 😃
Very strangely, even though I did **not** change anything about my
configurations when it comes to `KafkaConnector <> Debezium` and `Hudi
DeltaStreamer`, the Data-Flow **worked** and I could get the CDC Kafka
streams/topics ingested into **partitioned Hudi Tables**!!!
I also verified that said Hudi Tables can be **queried** via SparkSQL
without an issue.
### BUT, DELETE Events Cause Failure
However, I noticed that IF I `DELETE` record(s), the corresponding CDC
events will crash the DeltaStreamer pipeline consuming that Kafka topic. I
could reproduce this consistently. The exception looks like:
```
Exception in thread "main" org.apache.hudi.exception.HoodieException:
org.apache.hudi.exception.HoodieException: Commit 20230712172015072 failed and
rolled-back !
at
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$1(HoodieDeltaStreamer.java:197)
at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
at
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:192)
at
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:573)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)
at
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.util.concurrent.ExecutionException:
org.apache.hudi.exception.HoodieException: Commit 20230712172015072 failed and
rolled-back !
at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at
org.apache.hudi.async.HoodieAsyncService.waitForShutdown(HoodieAsyncService.java:103)
at
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$1(HoodieDeltaStreamer.java:195)
... 15 more
Caused by: org.apache.hudi.exception.HoodieException: Commit
20230712172015072 failed and rolled-back !
at
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$1(HoodieDeltaStreamer.java:758)
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.hudi.exception.HoodieException: Commit
20230712172015072 failed and rolled-back !
at
org.apache.hudi.utilities.deltastreamer.DeltaSync.writeToSink(DeltaSync.java:740)
at
org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:395)
at
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$1(HoodieDeltaStreamer.java:716)
... 4 more
```
Here are some relevant points:
- I've **ENABLED tombstone records** in my Debezium configuration and have
the following line:
```yaml
tombstones.on.delete: true
```
- I've also listened to the Kafka topic and can see the Key-Value related to
the CDC DELETE **along with the subsequent tombstone record** are published as
expected:
```json
// DELETED RECORD's CDC Kafka Event's Key
{
"id": 13
}
// DELETED RECORD'S CDC Kafka Event's Value
{
"before": {
"<redacted>.Value": {
"id": 13,
"name": "",
"age": 0,
"created_at": null,
"event_ts": null
}
},
"after": null,
"source": {
"version": "2.2.0.Final",
"connector": "postgresql",
"name": "<redacted>",
"ts_ms": 1689207590867,
"snapshot": {
"string": "false"
},
"db": "example_db",
"sequence": {
"string": "[\"115226342808\",\"115293028616\"]"
},
"schema": "public",
"table": "<redacted>",
"txId": {
"long": 19773
},
"lsn": {
"long": 115293028616
},
"xmin": null
},
"op": "d",
"ts_ms": {
"long": 1689207590896
},
"transaction": null
}
// Subsequent TOMBSTONE Event's Key
{
"id": 13
}
// Subsequent TOMBSTONE Event's Value
null
```
- Finally, I'm running a [Hudi
FORK](https://github.com/samserpoosh/hudi/tree/samser/dbz-filter-null-events)
which is based off `0.13.0` branch/tag/release.
- I've only applied **one commit** on top of that which FILTERS OUT `null`
Kafka events to ignore **tombstone** events. The change was inspired by [this
comment](https://github.com/apache/hudi/issues/8519#issuecomment-1533511853)
and
[change](https://github.com/sydneyhoran/hudi/commit/b864a69e27d50424b6984f28a31c3bd99a025762)
by @sydneyhoran.
### Another Issue: The "before" Content Looks Inaccurate
As you can see in the aforementioned JSON blob, the `before` field's message
looks like this:
```json
...
"before": {
"<redacted>.Value": {
"id": 13,
"name": "",
"age": 0,
"created_at": null,
"event_ts": null
}
...
```
Those ^^ values are **inaccurate**. For instance, the record I **DELETED**
had a non-zero value for `age` and other fields also had different values. Any
idea what's going on here?
Thanks in advance!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]