gdmachado commented on issue #16016:
URL: https://github.com/apache/iceberg/issues/16016#issuecomment-4587580400

   we hit this on iceberg-kafka-connect 1.10.1 in prod (REST catalog, 
distributed mode, eager rebalancing)
   
   symptom: a sink connector silently stops committing snapshots.
   
   consumer offsets keep advancing because files still get written, so lag 
looks fine, but no new snapshots land and the task log just loops an auth error 
from our OAuth2 catalog client. that auth error is a red herring - its a 
`CoordinatorThread` that should have been stopped, still running against a 
catalog whose session is closed.
   
   root cause is the leaked `CoordinatorThread` this issue describes. theres 
two ways into the leak though, and i think #16020 only closes one of them:
   
   1. `terminate()` doesnt join (this is the one #16020 fixes). 
`stopCoordinator()` calls `coordinatorThread.terminate()` but never joins, so 
the coordinator can outlive close.
   
   2. `close()` skips `stopCoordinator()` entirely during a rebalance (not 
covered). `CommitterImpl.close()` only stops the coordinator if 
`hasLeaderPartition()` is true:
   
   ```java
   public void close(Collection<TopicPartition> closedPartitions) {
       ...
       if (hasLeaderPartition(closedPartitions)) {
       stopCoordinator();
       }
       ...
   }
   ```
   
   `hasLeaderPartition()` asks the consumer group whos leader and needs 
`ConsumerGroupState.STABLE`. but `close()` runs during the rebalance thats 
revoking the partitions, so the group isnt STABLE, the gate is false, and 
`stopCoordinator()` never runs. the leaders coordinator thread gets orphaned, 
the next assignment starts a fresh one, and now you have two coordinators with 
the old one looping against its closed catalog.
   
   I repro'd it locally to be sure - docker-compose with KRaft kafka + 
cp-kafka-connect, a real REST catalog, `tasks.max=3`, short commit interval, 
forced task restarts to drive rebalances. counting "iceberg-coord" threads with 
jstack (healthy is 1):
   
   - stock 1.10.1: 4-5 coordinator threads after a few rebalances
   - #16020 join alone, gate kept: stillĀ 5. the join doesnt help if 
`stopCoordinator()` never gets called.
   - gate removed + join: back to 1, stable across repeated rebalances.
   
   the middle one is the point - join alone wasnt enough for us. the fix that 
worked was dropping the `hasLeaderPartition()` gate and calling 
`stopCoordinator()` unconditionally.
   
   its safe because `coordinatorThread` is only set on the task that actually 
started a coordinator, so its a no-op on non-leaders:
   
   ```java
   public void close(Collection<TopicPartition> closedPartitions) {
       ...
       stopCoordinator();   // no-op if this task never started one
       stopWorker();
       KafkaUtils.seekToLastCommittedOffsets(context);
   }
   ```
   
   (plus the join inside `stopCoordinator()`, same as #16020).
   
   I'm happy to contribute this upstream, either fold the gate removal into 
@utafrali's #16020 or open a separate PR with the repro


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to