With the KafkaUnboundedReader when a consumer is configured with a consumer 
group id a call is made to get the current position of the consumer group for 
each topic/partition pair. This offset is used as the initial offset for the 
read transform. This allows pipelines that persist offsets (either via auto 
commit configured via the consumer or via the commit offsets in finalize 
option) to later resume at the same point.

With the SDF implementation (ReadSourceDescriptors/ReadFromKafkaDoFn) of the 
Kafka reader no equivalent call is made to retrieve offsets. The SDF 
implementation uses instead the offset consumer to retrieve the initial offset. 
Because the offset consumer is configured to never commit offsets this will 
always be one of earliest, latest or an offset calculated from a specific time. 

Users can provide to ReadFromKafkaDoFn a KafkaSourceDescriptor with an initial 
offset retrieved explicitly but on a failure of the processElement transform 
there is no way to resume from the last persisted offset. Instead I believe the 
original offset will be used. In long lived pipelines this will result in data 
duplication.

Is this an intended change, or should I open an issue marking this as a 
regression in functionality? Thanks.

Reply via email to