iht opened a new pull request, #38603:
URL: https://github.com/apache/beam/pull/38603

   This PR may solve #36991.                                                    
                                                                                
       
                                                                                
                                                                                
         
    ### Problem Statement                                                       
                                                                                
        
                                                                                
                                                                                
         
     The  SolaceIO  connector was experiencing data loss during Dataflow 
scaling/rebalancing events.                                                     
                
                                                                                
                                                                                
         
    - The original code acknowledged messages prematurely (inside  advance()  
and  close() ), before they were committed by Dataflow. If a work item failed 
after being  read, those messages were already acked in Solace and could not be 
redelivered, leading to data loss.                                              
                 
    - A previous fix attempt (#37007) removed these premature acks but 
introduced a regression where the pipeline got "stuck."  If a checkpoint 
finalization call (`finalizeCheckpoint`) was lost or delayed, the associated 
messages remained unacknowledged indefinitely on the Solace broker,  eventually 
hitting the  max-delivered-unacked-msgs-per-flow  limit and halting message 
delivery. That fix was subsequently reverted in #37162                          
                                                                                
                              
                                                                                
                                                                                
         
     ### Proposed Solution                                                      
                                                                                
         
                                                                                
                                                                                
         
     This PR introduces a robust, thread-safe sequential finalization catch-up 
mechanism that ensures we only acknowledge committed messages without risking 
leaks or stuckness if finalizations are lost.                                   
                                                                                
             
     
     #### 1. Active Reader Tracking ( ActiveReadersRegistry )
     
     We introduce a JVM-global  ActiveReadersRegistry  that tracks active  
UnboundedSolaceReader  instances using  WeakReferences.
     
     - When  SolaceCheckpointMark  is serialized and sent to the runner, it 
loses direct in-memory references.
     - Upon finalization, the mark uses a serializable  readerUuid  to resolve 
the active reader from the registry and delegate the acknowledgment.
     
     #### 2. Sequential Catch-Up Finalization
     
     Instead of a shared queue, the reader now maintains a  `TreeMap<Long, 
List<BytesXMLMessage>> pendingCheckpoints` to track messages per individual 
checkpoint ID.     
     
    - Sequential Acks: When  finalizeCheckpoint(checkpointId)  is called, the 
reader acknowledges the messages for that specific checkpoint and any older 
pending  checkpoints (e.g.,  pendingId <= checkpointId ).
    - Self-Healing: Since Dataflow (and other runners) commits state 
sequentially per split, if checkpoint  T1  finalization is lost but  T2  
succeeds, the finalization of  T2  will  automatically "catch up" and 
acknowledge/clean up the messages for  T1. This completely prevents 
unacknowledged message leaks on the broker.
     
     #### 3. Concurrency & Thread Safety
     
     To avoid blocking the critical reader thread (calling  advance  and  
getCheckpointMark ) during network operations in the finalizer thread:
     
   - We use extremely short synchronized blocks to copy state (extracting 
pending messages to a local list).
   - All actual Solace network I/O ( msg.ackMessage() ) is performed outside 
the synchronized blocks, ensuring zero-blocking concurrency.
   
   ### Changes Made
     
     1.  ActiveReadersRegistry.java  (New): Added a thread-safe, 
weak-reference-based global registry for active readers.
     2.  UnboundedSolaceReader.java :
         - Removed  safeToAckMessages  queue and the premature  
finalizeReadyMessages()  helper.
         - Added  pendingCheckpoints  ( TreeMap ) and sequential  
finalizeCheckpoint(long)  logic with non-blocking thread safety.
         - Added synchronized protection around  receivedMessages  in  
advance() .
         - Registers with  ActiveReadersRegistry  post-construction and 
unregisters in  close() .
     3.  SolaceCheckpointMark.java :
         - Removed transient  safeToAck  queue.
         - Added serializable  readerUuid  (String) and  checkpointId  (long) 
fields.
         - Delegates  finalizeCheckpoint()  to the resolved reader via the 
registry.
     4.  UnboundedSolaceSource.java :
         - Modified  createReader  to register the reader immediately after 
construction, avoiding Checker Framework initialization order warnings.
     5.  SolaceIOReadTest.java :
         - Updated  testCheckpointMarkAndFinalizeSeparately  to align with the 
fact that  advance()  no longer triggers premature acks.
         - Added  testLostCheckpointCatchUp : Explicitly simulates a lost 
finalization (T1 lost, T2 finalized) and verifies that both T1 and T2 messages 
(7 messages in total) are successfully acknowledged, confirming the catch-up 
behavior works.
   
   To check the build health, please visit 
[https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   
------------------------------------------------------------------------------------------------
   [![Build python source distribution and 
wheels](https://github.com/apache/beam/actions/workflows/build_wheels.yml/badge.svg?event=schedule&&?branch=master)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python 
tests](https://github.com/apache/beam/actions/workflows/python_tests.yml/badge.svg?event=schedule&&?branch=master)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java 
tests](https://github.com/apache/beam/actions/workflows/java_tests.yml/badge.svg?event=schedule&&?branch=master)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go 
tests](https://github.com/apache/beam/actions/workflows/go_tests.yml/badge.svg?event=schedule&&?branch=master)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more 
information about GitHub Actions CI or the [workflows 
README](https://github.com/apache/beam/blob/master/.github/workflows/README.md) 
to see a list of phrases to trigger workflows.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to