I managed to answer some of my own questions :)

For future google'ers:

to deal with retention.ms see
https://issues.apache.org/jira/browse/KAFKA-4340
to deal with early rejection of bad timestamps the
message.timestamp.difference.max.ms config is relevant discussion here
https://issues.apache.org/jira/browse/KAFKA-5344

In our case, we can live with setting the retention.ms during backfills.
Still would like to know if there are any better practices for dealing with
mis-stamped records during backills w/ state store topics.


On Thu, Jul 6, 2017 at 12:32 PM, Greg Fodor <gfo...@gmail.com> wrote:

> Hey all, we are currently working on migrating our system to kafka 10.2
> from 10.0 and one thing that we have hit that I wanted some advice on is
> dealing with the new log retention/rolling semantics that are based on
> timestamps.
>
> We send telemetry data from installed clients into kafka via kafka REST
> proxy and the timestamps we land the messages with are "create time" based
> that are timestamped on the sender side. We try to adjust for clock skew
> but this is not perfect and in practice we end up having some small subset
> of data landing into this topic with very erroneous timestamps (for
> example, some arrive with timestamps many years in the future.)
>
> The first problem we are hitting is that these corrupt timestamps now
> influence log segment rolling. For example, when reprocessing the entire
> log, we end up seeing a bunch of segment files generated for state stores
> changelogs in kafka streams that store these events since as corrupted
> timestamps come in a single one can trigger a segment roll if they are
> timestamped far in the future due to the new heuristics. The result is we
> end up with hundreds of small segment files (which actually in our current
> configuration ends up causing kafka to run out of memory, but that's
> another story :))
>
> The second problem we are hitting is when reprocessing the full log, since
> these timestamps are in the past as we run from the beginning, if we have a
> time based retention policy set on the state store changelog topic (say, a
> week) kafka ends up just deleting segments immediately since the timestamps
> are far in the past and the segments are considered expired. Previously
> this worked fine during reprocessing since the state store changelogs were
> just going to get deleted a week after the reprocess job ran since the
> retention policy was based upon segment file timestamp.
>
> Both of these problems could potentially be compensated for by writing a
> clever timestamp extractor that tried to a) normalize timestamps that
> appear very skewed and b) for changelog entries, extract a "logged at"
> instead of "created at" timestamp when landing into the state store
> changelog. The second problem could also be addressed by temporarily
> changing the topic configuration during a reprocess to prevent "old" log
> segments from being deleted. Neither of these seem ideal.
>
> I wanted to know if there are any recommendations on how to deal with this
> -- it seems like there is a conflict between having segment file policies
> be based on message timestamps and also having message timestamps be based
> on application creation time, since origin create time can often be subject
> to noise/skew/errors. One potential path forward would be to be able to
> have topic-specific settings for log rolling (including the ability to use
> the legacy behavior for retention that relies upon filesystem timestamps)
> but I am sure there are problems with this proposal.
>
> In general, I don't really feel like I have a good sense of what a correct
> solution would be, other than messages always having two timestamps and
> being able to have control over which timestamp is authoritative for log
> segment management policies, but that obviously seems like something that
> was considered and rejected for KIP-32 already.
>

Reply via email to