gdmachado commented on issue #16016:
URL: https://github.com/apache/iceberg/issues/16016#issuecomment-4587580400
we hit this on iceberg-kafka-connect 1.10.1 in prod (REST catalog,
distributed mode, eager rebalancing)
symptom: a sink connector silently stops committing snapshots.
consumer offsets keep advancing because files still get written, so lag
looks fine, but no new snapshots land and the task log just loops an auth error
from our OAuth2 catalog client. that auth error is a red herring - its a
`CoordinatorThread` that should have been stopped, still running against a
catalog whose session is closed.
root cause is the leaked `CoordinatorThread` this issue describes. theres
two ways into the leak though, and i think #16020 only closes one of them:
1. `terminate()` doesnt join (this is the one #16020 fixes).
`stopCoordinator()` calls `coordinatorThread.terminate()` but never joins, so
the coordinator can outlive close.
2. `close()` skips `stopCoordinator()` entirely during a rebalance (not
covered). `CommitterImpl.close()` only stops the coordinator if
`hasLeaderPartition()` is true:
```java
public void close(Collection<TopicPartition> closedPartitions) {
...
if (hasLeaderPartition(closedPartitions)) {
stopCoordinator();
}
...
}
```
`hasLeaderPartition()` asks the consumer group whos leader and needs
`ConsumerGroupState.STABLE`. but `close()` runs during the rebalance thats
revoking the partitions, so the group isnt STABLE, the gate is false, and
`stopCoordinator()` never runs. the leaders coordinator thread gets orphaned,
the next assignment starts a fresh one, and now you have two coordinators with
the old one looping against its closed catalog.
I repro'd it locally to be sure - docker-compose with KRaft kafka +
cp-kafka-connect, a real REST catalog, `tasks.max=3`, short commit interval,
forced task restarts to drive rebalances. counting "iceberg-coord" threads with
jstack (healthy is 1):
- stock 1.10.1: 4-5 coordinator threads after a few rebalances
- #16020 join alone, gate kept: stillĀ 5. the join doesnt help if
`stopCoordinator()` never gets called.
- gate removed + join: back to 1, stable across repeated rebalances.
the middle one is the point - join alone wasnt enough for us. the fix that
worked was dropping the `hasLeaderPartition()` gate and calling
`stopCoordinator()` unconditionally.
its safe because `coordinatorThread` is only set on the task that actually
started a coordinator, so its a no-op on non-leaders:
```java
public void close(Collection<TopicPartition> closedPartitions) {
...
stopCoordinator(); // no-op if this task never started one
stopWorker();
KafkaUtils.seekToLastCommittedOffsets(context);
}
```
(plus the join inside `stopCoordinator()`, same as #16020).
I'm happy to contribute this upstream, either fold the gate removal into
@utafrali's #16020 or open a separate PR with the repro
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]