Chris Pettitt created KAFKA-8816:
------------------------------------
Summary: RecordCollector offsets updated indirectly by StreamTask
Key: KAFKA-8816
URL: https://issues.apache.org/jira/browse/KAFKA-8816
Project: Kafka
Issue Type: Bug
Components: streams
Reporter: Chris Pettitt
Assignee: Chris Pettitt
Currently it is possible to indirectly update the offsets in
RecordCollectorImpl via the offset read function:
{code:java}
@Override
public Map<TopicPartition, Long> offsets() {
return offsets;
} {code}
The offsets here is the a private final field in RecordCollectorImpl. It
appears that the intent is for this field to be updated only when the producer
acknowledges an offset. However, because it is handed back in a mutable form,
it is possible to update offsets through this call, as actually happens today
in StreamTask:
{code:java}
protected Map<TopicPartition, Long> activeTaskCheckpointableOffsets() {
final Map<TopicPartition, Long> checkpointableOffsets =
recordCollector.offsets();
for (final Map.Entry<TopicPartition, Long> entry :
consumedOffsets.entrySet()) {
checkpointableOffsets.putIfAbsent(entry.getKey(), entry.getValue());
}
return checkpointableOffsets;
}{code}
Here it is possible to set a new checkpoint if the topic partition is not
already in the offsets map, which happens for the input topic when we're using
optimized topologies and a KTable. The effect is that we continue to checkpoint
the first offset seen (putIfAbsent).
It seems the correct behavior would be to return a read only view of the
offsets from RecordCollectorImpl and create a copy of the returned map in
activeTaskCheckpointableOffsets before we mutate it.
--
This message was sent by Atlassian Jira
(v8.3.2#803003)