[
https://issues.apache.org/jira/browse/FLINK-17348?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Aljoscha Krettek updated FLINK-17348:
-------------------------------------
Component/s: API / DataStream
> Expose metric group to ascendingTimestampExtractor
> --------------------------------------------------
>
> Key: FLINK-17348
> URL: https://issues.apache.org/jira/browse/FLINK-17348
> Project: Flink
> Issue Type: Improvement
> Components: API / DataStream
> Reporter: Theo Diefenthal
> Priority: Major
>
> A common use case in Flink + kafka is that one has lots of kafka Partitions
> with each having ascending timestamps.
> In my scenario, due to various operational reasons, we put log files from
> Filesystem to kafka, one server per partition, and then consume those in
> Flink.
> Sometimes, it can happen that we collect the files in wrong order into kafka
> which leads to ascending timestamp problems. If that happens and we have the
> default logging violation handler enabled, we produce several gb of logs in a
> very short amount of time, which we would like to circumvent.
> What we really want : track the number of violations in a metric and define
> an alarm on that in our monitoring dashboard.
> Currently, there is sadly no way to reference the metric group from the
> ascending timestamp extractor. I wish, there could be something similar like
> the open method on other rich functions.
> My current workaround is to add a custom map task post to the source. For
> that task I need to pass on the kafka partition from the source, which I
> usually don't care about and I need to keep track of each partitions current
> timestamp manually, exactly the same way as the extractor does. - >
> workaround with "polluting" my pipeline quite a bit just for a single metric.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)