iht opened a new pull request, #38603:
URL: https://github.com/apache/beam/pull/38603
This PR may solve #36991.
### Problem Statement
The SolaceIO connector was experiencing data loss during Dataflow
scaling/rebalancing events.
- The original code acknowledged messages prematurely (inside advance()
and close() ), before they were committed by Dataflow. If a work item failed
after being read, those messages were already acked in Solace and could not be
redelivered, leading to data loss.
- A previous fix attempt (#37007) removed these premature acks but
introduced a regression where the pipeline got "stuck." If a checkpoint
finalization call (`finalizeCheckpoint`) was lost or delayed, the associated
messages remained unacknowledged indefinitely on the Solace broker, eventually
hitting the max-delivered-unacked-msgs-per-flow limit and halting message
delivery. That fix was subsequently reverted in #37162
### Proposed Solution
This PR introduces a robust, thread-safe sequential finalization catch-up
mechanism that ensures we only acknowledge committed messages without risking
leaks or stuckness if finalizations are lost.
#### 1. Active Reader Tracking ( ActiveReadersRegistry )
We introduce a JVM-global ActiveReadersRegistry that tracks active
UnboundedSolaceReader instances using WeakReferences.
- When SolaceCheckpointMark is serialized and sent to the runner, it
loses direct in-memory references.
- Upon finalization, the mark uses a serializable readerUuid to resolve
the active reader from the registry and delegate the acknowledgment.
#### 2. Sequential Catch-Up Finalization
Instead of a shared queue, the reader now maintains a `TreeMap<Long,
List<BytesXMLMessage>> pendingCheckpoints` to track messages per individual
checkpoint ID.
- Sequential Acks: When finalizeCheckpoint(checkpointId) is called, the
reader acknowledges the messages for that specific checkpoint and any older
pending checkpoints (e.g., pendingId <= checkpointId ).
- Self-Healing: Since Dataflow (and other runners) commits state
sequentially per split, if checkpoint T1 finalization is lost but T2
succeeds, the finalization of T2 will automatically "catch up" and
acknowledge/clean up the messages for T1. This completely prevents
unacknowledged message leaks on the broker.
#### 3. Concurrency & Thread Safety
To avoid blocking the critical reader thread (calling advance and
getCheckpointMark ) during network operations in the finalizer thread:
- We use extremely short synchronized blocks to copy state (extracting
pending messages to a local list).
- All actual Solace network I/O ( msg.ackMessage() ) is performed outside
the synchronized blocks, ensuring zero-blocking concurrency.
### Changes Made
1. ActiveReadersRegistry.java (New): Added a thread-safe,
weak-reference-based global registry for active readers.
2. UnboundedSolaceReader.java :
- Removed safeToAckMessages queue and the premature
finalizeReadyMessages() helper.
- Added pendingCheckpoints ( TreeMap ) and sequential
finalizeCheckpoint(long) logic with non-blocking thread safety.
- Added synchronized protection around receivedMessages in
advance() .
- Registers with ActiveReadersRegistry post-construction and
unregisters in close() .
3. SolaceCheckpointMark.java :
- Removed transient safeToAck queue.
- Added serializable readerUuid (String) and checkpointId (long)
fields.
- Delegates finalizeCheckpoint() to the resolved reader via the
registry.
4. UnboundedSolaceSource.java :
- Modified createReader to register the reader immediately after
construction, avoiding Checker Framework initialization order warnings.
5. SolaceIOReadTest.java :
- Updated testCheckpointMarkAndFinalizeSeparately to align with the
fact that advance() no longer triggers premature acks.
- Added testLostCheckpointCatchUp : Explicitly simulates a lost
finalization (T1 lost, T2 finalized) and verifies that both T1 and T2 messages
(7 messages in total) are successfully acknowledged, confirming the catch-up
behavior works.
To check the build health, please visit
[https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
GitHub Actions Tests Status (on master branch)
------------------------------------------------------------------------------------------------
[](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
[](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
[](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
[](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more
information about GitHub Actions CI or the [workflows
README](https://github.com/apache/beam/blob/master/.github/workflows/README.md)
to see a list of phrases to trigger workflows.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]