zachjsh opened a new issue, #16189:
URL: https://github.com/apache/druid/issues/16189
In SeekableStreamSupervisor::getOffsetsFromMetadataStorage(), we do:
```
if (!ioConfig.getStream().equals(partitions.getStream())) {
log.warn(
"Topic/stream in metadata storage [%s] doesn't match spec
topic/stream [%s], ignoring stored sequences",
partitions.getStream(),
ioConfig.getStream()
);
return Collections.emptyMap();
}
```
This code is inherited by the Kafka supervisor. In this, ioConfig is the
ingestion spec, and partitions is the stored checkpoints for this datasource
from the metadata store. This check sees if the ioConfig.getStream() is not the
same string as partitions.getStream(), and if isn’t, we ignore stored offsets.
This interacts awkwardly with Kafka’s multi-topic support:
ioConfig.getStream() is either topic (name) or topicPattern (pattern),
whichever is set. (It’s an error for both to be set.) Therefore, it represents
either a single, static topic name or a regular expression pattern.
partitions.getStream() is the ioConfig.getStream() that was used for the
last streaming ingestion job on this datasource. As above, this means it can be
a name or a pattern.
As a result, offsets get ignored for some situations that may cause
inadvertent data duplication:
Converting from a topic name to a pattern, even if the pattern matches the
previous name. For example, converting from topic: vehicle to topicPattern:
vehicle.*.
The inverse situation, converting from a topic pattern to a name, even if
the name matches the previous pattern. For example, converting from
topicPattern: `topicName.*` to topic: `topicName`.
Changing the pattern where the set of topics selected by the old pattern and
the new pattern overlap. For example, changing from topicPattern: foo-.* to
topicPattern: foo-x-.* when there are topics like foo-1, foo-x-1, and foo-x-2
would ignore the stored offsets for foo-x-1 and foo-x-2, even though they
matched both before and after.
We can improve this for Kafka multi-topic by allowing Kafka to override this
method (or some protected/public method it calls). We have to handle:
Topic name → pattern – The checkpoints don’t have a topic name in the
partition keys, so if the partitions container’s getStream() matches the new
pattern, we need to update the in-memory map to populate this field in the
partition key with the value from getStream().
Pattern → topic name – The checkpoints have a topic name in the partition
keys, and we need to filter the map by just those entries where the partition
key’s topic name exactly matches the new topic name. We ignore the container’s
getStream().
Pattern → pattern – The checkpoints have a topic name in the partition keys,
and we need to filter the map by just those entries whose key matches the new
pattern. We ignore the container’s getStream().
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]