zachjsh opened a new issue, #16189:
URL: https://github.com/apache/druid/issues/16189

   In SeekableStreamSupervisor::getOffsetsFromMetadataStorage(), we do:
   ```
           if (!ioConfig.getStream().equals(partitions.getStream())) {
             log.warn(
                 "Topic/stream in metadata storage [%s] doesn't match spec 
topic/stream [%s], ignoring stored sequences",
                 partitions.getStream(),
                 ioConfig.getStream()
             );
             return Collections.emptyMap();
           }
   ```
   
   This code is inherited by the Kafka supervisor. In this, ioConfig is the 
ingestion spec, and partitions is the stored checkpoints for this datasource 
from the metadata store. This check sees if the ioConfig.getStream() is not the 
same string as partitions.getStream(), and if isn’t, we ignore stored offsets. 
This interacts awkwardly with Kafka’s multi-topic support:
   
   ioConfig.getStream() is either topic (name) or topicPattern (pattern), 
whichever is set. (It’s an error for both to be set.) Therefore, it represents 
either a single, static topic name or a regular expression pattern.
   
   partitions.getStream() is the ioConfig.getStream() that was used for the 
last streaming ingestion job on this datasource. As above, this means it can be 
a name or a pattern.
   
   As a result, offsets get ignored for some situations that may cause 
inadvertent data duplication:
   
   Converting from a topic name to a pattern, even if the pattern matches the 
previous name. For example, converting from topic: vehicle to topicPattern: 
vehicle.*.
   
   The inverse situation, converting from a topic pattern to a name, even if 
the name matches the previous pattern. For example, converting from 
topicPattern: `topicName.*` to topic: `topicName`.
   
   Changing the pattern where the set of topics selected by the old pattern and 
the new pattern overlap. For example, changing from topicPattern: foo-.* to 
topicPattern: foo-x-.* when there are topics like foo-1, foo-x-1, and foo-x-2 
would ignore the stored offsets for foo-x-1 and foo-x-2, even though they 
matched both before and after.
   
   We can improve this for Kafka multi-topic by allowing Kafka to override this 
method (or some protected/public method it calls). We have to handle:
   
   Topic name → pattern – The checkpoints don’t have a topic name in the 
partition keys, so if the partitions container’s getStream() matches the new 
pattern, we need to update the in-memory map to populate this field in the 
partition key with the value from getStream().
   
   Pattern → topic name – The checkpoints have a topic name in the partition 
keys, and we need to filter the map by just those entries where the partition 
key’s topic name exactly matches the new topic name. We ignore the container’s 
getStream().
   
   Pattern → pattern – The checkpoints have a topic name in the partition keys, 
and we need to filter the map by just those entries whose key matches the new 
pattern. We ignore the container’s getStream().


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to