[
https://issues.apache.org/jira/browse/FLINK-3264?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Flink Jira Bot updated FLINK-3264:
----------------------------------
Labels: auto-deprioritized-major auto-deprioritized-minor (was:
auto-deprioritized-major stale-minor)
Priority: Not a Priority (was: Minor)
This issue was labeled "stale-minor" 7 days ago and has not received any
updates so it is being deprioritized. If this ticket is actually Minor, please
raise the priority and ask a committer to assign you the issue or revive the
public discussion.
> Add load shedding policy into Kafka Consumers
> ---------------------------------------------
>
> Key: FLINK-3264
> URL: https://issues.apache.org/jira/browse/FLINK-3264
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / Kafka
> Reporter: Robert Metzger
> Priority: Not a Priority
> Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> There are situations when Flink's Kafka Consumer is not able to consume
> everything produced into a topic, for example when one Flink instance is
> subscribed to a busy Kafka topic (See user request:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Frequent-exceptions-killing-streaming-job-td4323.html
> )
> I think we should allow users to control the behavior of the Kafka consumer
> in those situations.
> I had an offline discussion with [~StephanEwen] about this and we think that
> the allowing users to pass a LoadSheddingPolicy to the KafkaConsumer would be
> the best solution.
> In the policy, users can define a frequency for the consumer to request the
> latest offsets in the subscribed partitions (the requests can either be based
> on time (every n ms) or on record count (every n'th record). Then, the policy
> can decide to skip a certain amount of offsets (maybe even set to the latest
> offset).
> With the "offset skipping" approach, we'll avoid fetching records we can not
> process anyways.
> In the 0.9 consumer, there doesn't seem to be an API for requesting the
> latest offset of a topicPartition. I'll ask on the Kafka ML whats the status
> there.
> With {{seek()}} we can fetch from any offset.
> In the 0.8 SimpleConsumer, there is a method for requesting the offsets:
> {code}
> kafka.javaapi.OffsetRequest request = new
> kafka.javaapi.OffsetRequest(requestInfo,
> kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
> OffsetResponse response =
> consumer.getOffsetsBefore(request);
> {code}
> The fetch offset is controlled within the {{LegacyFetcher}}.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)