With the KafkaUnboundedReader when a consumer is configured with a consumer group id a call is made to get the current position of the consumer group for each topic/partition pair. This offset is used as the initial offset for the read transform. This allows pipelines that persist offsets (either via auto commit configured via the consumer or via the commit offsets in finalize option) to later resume at the same point.
With the SDF implementation (ReadSourceDescriptors/ReadFromKafkaDoFn) of the Kafka reader no equivalent call is made to retrieve offsets. The SDF implementation uses instead the offset consumer to retrieve the initial offset. Because the offset consumer is configured to never commit offsets this will always be one of earliest, latest or an offset calculated from a specific time. Users can provide to ReadFromKafkaDoFn a KafkaSourceDescriptor with an initial offset retrieved explicitly but on a failure of the processElement transform there is no way to resume from the last persisted offset. Instead I believe the original offset will be used. In long lived pipelines this will result in data duplication. Is this an intended change, or should I open an issue marking this as a regression in functionality? Thanks.
