cb149 opened a new issue #3161:
URL: https://github.com/apache/hudi/issues/3161
**Describe the problem you faced**
I have a Spark Structured Streaming application running on YARN which I
trigger once at the start of every hour. Now I have discovered missing messages
in my Hudi table, which is confusing since Structured Streaming proclaims _the
system ensures end-to-end exactly-once fault-tolerance guarantees_
Effectively the application runs:
```scala
spark
.readStream
.format("kafka")
.options(options) // the usual, bootstrapServers etc.
.load()
.selectExpr(s"CAST(key AS STRING) as $name", "CAST(value AS STRING)")
// multiple .withColumn("xyz", from_json(col("value"), struct)) and
explode()
.writeStream
.trigger(Trigger.Once())
.format("hudi")
.option("path", config.outputPath)
.option("checkpointLocation", config.checkpointPath)
.options(getHudiConfig())
.outputMode(OutputMode.Append)
.start()
```
Commit operation is `upsert` and my Hudi config settings include (among the
usual):
```scala
HoodieWriteConfig.FAIL_ON_TIMELINE_ARCHIVING_ENABLED_PROP -> "false",
DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "ts",
DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH_OPT_KEY -> "false",
DataSourceWriteOptions.STREAMING_RETRY_CNT_OPT_KEY -> "0"
```
and my spark settings include
```
"spark.rdd.compress": "true",
"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
"spark.unsafe.sorter.spill.read.ahead.enabled": "false",
```
I have rerun the exact same application for the Kafka offsets containing the
missing messages (about 2 days after the application ran for those offsets),
writing to a new Hudi table and the missing message was not missing from that
one, so it seems to me there is an issue with StructuredStreaming from Kafka to
Hudi in particular with running shortly after a message was ingested. Looking
into the log of the application that missed the message, I can see no errors,
but the following:
```
21/06/24 08:04:06 INFO hudi.HoodieStreamingSink: Micro batch id=322
succeeded for commit=20210624080101
21/06/24 08:04:06 INFO hudi.HoodieStreamingSink: Micro batch id=322 succeeded
21/06/24 08:04:06 INFO streaming.CheckpointFileManager: Writing atomically
to hdfs://{path}/checkpoint/commits/322 using temp file
hdfs://{path}/checkpoint/commits/.322.e7795a54-3ea1-435a-a5de-253f2e66d8fe.tmp
21/06/24 08:04:06 INFO streaming.CheckpointFileManager: Renamed temp file
hdfs://{path}/checkpoint/commits/.322.e7795a54-3ea1-435a-a5de-253f2e66d8fe.tmp
to hdfs://{path}/checkpoint/commits/322
21/06/24 08:04:06 INFO streaming.MicroBatchExecution: Streaming query made
progress: {
"id" : "85adfe77-fa91-4094-ba2a-1ac65dd193cc",
"runId" : "f1012bb4-0c38-4b99-9bb9-7f02ed3f3087",
"name" : null,
"timestamp" : "2021-06-24T06:00:56.265Z",
"batchId" : 322,
"numInputRows" : 2101,
"processedRowsPerSecond" : 11.066864721932513,
"durationMs" : {
"addBatch" : 184786,
"getBatch" : 8,
"getEndOffset" : 0,
"queryPlanning" : 256,
"setOffsetRange" : 3766,
"triggerExecution" : 189845,
"walCommit" : 716
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[topic]]",
"startOffset" : {
"topic" : {
"2" : 2354732,
"1" : 852378,
"0" : 4484872
}
},
"endOffset" : {
"topic" : {
"2" : 2355432,
"1" : 852507,
"0" : 4486272
}
},
"numInputRows" : 2101,
"processedRowsPerSecond" : 11.066864721932513
} ],
"sink" : {
"description" : "HoodieStreamingSink[table]"
}
}
```
The weird thing to me is that if I substract `startOffset` from the
`endOffset` I get **2229**. This should however match the **2101** of
`numInputRows` right? In the other runs of the application, I can see that the
difference between the offsets is always `numInputrows - 1`.
This would also be the case here if we only combine the differences for
partitions 2 and 0, resulting in 2100, which makes me assume the messages I am
missing are in partition 1, which shows a difference of 129.
Now since I effectively ran the code twice, and produced two different
outputs, I am wondering where the issue comes from and how to fix it. Is the
mismatch between the offsets and `numInputRows` from above the issue and
indicates I lost 128 messages? Would this issue originate from the Kafka input
or from the HoodieStreamingSink?
Sadly, I can't replicate the issue.
I've been thinking about getting the `numInputRows` and offsets from
`query.lastProgress` in my code, to compare and throw an error on mismatch, but
that would only fail after the commit was commited, so I would have to manually
rerun for the missing data.
**Environment Description**
* Hudi version : 0.8.0
* Spark version : 2.4.0
* Hadoop version : 3.0.0
* Storage (HDFS/S3/GCS..) : HDFS
* Running on Docker? (yes/no) : no
**Additional context**
The message that is missing would have been ingested into Kafka shortly
before the query execution at 6:00 am and is about 40KB big.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]