Github user tzulitai commented on the issue:
https://github.com/apache/flink/pull/5282
@aljoscha this is the currently the case for any startup mode. Any
partition discovered after the initial batch fetch is considered a new
partition due to Kafka scale outs, and is therefore read from the record
horizon. Startup modes apply only to existing partitions.---
