SamLeurink opened a new issue, #32997:
URL: https://github.com/apache/beam/issues/32997

   ### What happened?
   
   This bug is quite a pickle and is something I've been looking at for a 
couple hours now.
   Non retained messages (of every QOS) are not forwarded by MqttIO.Read after 
restoring to a checkpoint after a specific error.
   This error being a StackOverflowError in `org.fusesource.mqtt.client`.
   
   The cause of the error I have found, namely being a message shortage 
received from the MQTT Broker. When the method call `connection.receive(1, 
TimeUnit.SECONDS)` is made it expects to receive a Message in the next second. 
After this message isn't attained it will result in the creation of a promise 
with a callback and this promise being added to a queue in 
`org.fusesource.mqtt.client.FutureConnection`. When you do receive a message 
one of two possibilities happen: Either the receive is handled before the await 
is done, which will result in receive returning the Message, or the receive is 
handled after the await is done, which will result in the message being added 
to the receive buffer for the next time a receive request is done. However this 
"putback" method wants to clear the promises that are made before it is able to 
finally add the message to the buffer. This promise then calls the callback 
which calls the "putback" method which results in the StackOverflowError if the 
queue
  contains around 600 Promise entries. With 1 Promise being added per second, a 
StackOverflowError will be achieved on receiving a Message after a stale time 
of +/- 10 minutes.
   
   This error is mostly because of an implementation in the client of 
fusesource, but is something to be made aware of. Seeing as the last commits on 
their github repo were more than 5 years ago and issues aren't handled 
currently.
   
   However this wouldn't be a big issue if it hadn't caused something else to 
happen. Namely that after the checkpoint is restored the messages that aren't 
retained on the broker are not being received. The cause of this I haven't 
found out, but seeing that a new instance of `MQTT` is being made for every 
start the only things that can be shared are the static variables, with the 
`blockingThreadPool` (and therefore `blockingExecutor`) being the only one. 
   
   I'll have a further look to see if I can pin point the exact place where 
it's going wrong. Updates will be posted in this thread.
   
   Additional info about the setup that I'm running:
   Broker - Mosquitto (Standard Protocol - TCP)
   Beam version - 2.55.0 (Not the most up to date, but the code that causes 
this error should be the same)
   Runner - Flink (But likely problem for all runners)
   MQTT Client version - 1.15 (Both on 2.55 and current)
   
   Below the `StackOverflowError`:
   ![Screenshot from 2024-11-01 
14-08-33](https://github.com/user-attachments/assets/7347a53c-e5e0-4750-9609-f8364a0c7f83)
   
   
   ### Issue Priority
   
   Priority: 1 (data loss / total loss of function)
   
   ### Issue Components
   
   - [ ] Component: Python SDK
   - [X] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [X] Component: IO connector
   - [ ] Component: Beam YAML
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Infrastructure
   - [ ] Component: Spark Runner
   - [X] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to