I managed to answer some of my own questions :) For future google'ers:
to deal with retention.ms see https://issues.apache.org/jira/browse/KAFKA-4340 to deal with early rejection of bad timestamps the message.timestamp.difference.max.ms config is relevant discussion here https://issues.apache.org/jira/browse/KAFKA-5344 In our case, we can live with setting the retention.ms during backfills. Still would like to know if there are any better practices for dealing with mis-stamped records during backills w/ state store topics. On Thu, Jul 6, 2017 at 12:32 PM, Greg Fodor <gfo...@gmail.com> wrote: > Hey all, we are currently working on migrating our system to kafka 10.2 > from 10.0 and one thing that we have hit that I wanted some advice on is > dealing with the new log retention/rolling semantics that are based on > timestamps. > > We send telemetry data from installed clients into kafka via kafka REST > proxy and the timestamps we land the messages with are "create time" based > that are timestamped on the sender side. We try to adjust for clock skew > but this is not perfect and in practice we end up having some small subset > of data landing into this topic with very erroneous timestamps (for > example, some arrive with timestamps many years in the future.) > > The first problem we are hitting is that these corrupt timestamps now > influence log segment rolling. For example, when reprocessing the entire > log, we end up seeing a bunch of segment files generated for state stores > changelogs in kafka streams that store these events since as corrupted > timestamps come in a single one can trigger a segment roll if they are > timestamped far in the future due to the new heuristics. The result is we > end up with hundreds of small segment files (which actually in our current > configuration ends up causing kafka to run out of memory, but that's > another story :)) > > The second problem we are hitting is when reprocessing the full log, since > these timestamps are in the past as we run from the beginning, if we have a > time based retention policy set on the state store changelog topic (say, a > week) kafka ends up just deleting segments immediately since the timestamps > are far in the past and the segments are considered expired. Previously > this worked fine during reprocessing since the state store changelogs were > just going to get deleted a week after the reprocess job ran since the > retention policy was based upon segment file timestamp. > > Both of these problems could potentially be compensated for by writing a > clever timestamp extractor that tried to a) normalize timestamps that > appear very skewed and b) for changelog entries, extract a "logged at" > instead of "created at" timestamp when landing into the state store > changelog. The second problem could also be addressed by temporarily > changing the topic configuration during a reprocess to prevent "old" log > segments from being deleted. Neither of these seem ideal. > > I wanted to know if there are any recommendations on how to deal with this > -- it seems like there is a conflict between having segment file policies > be based on message timestamps and also having message timestamps be based > on application creation time, since origin create time can often be subject > to noise/skew/errors. One potential path forward would be to be able to > have topic-specific settings for log rolling (including the ability to use > the legacy behavior for retention that relies upon filesystem timestamps) > but I am sure there are problems with this proposal. > > In general, I don't really feel like I have a good sense of what a correct > solution would be, other than messages always having two timestamps and > being able to have control over which timestamp is authoritative for log > segment management policies, but that obviously seems like something that > was considered and rejected for KIP-32 already. >