[
https://issues.apache.org/jira/browse/CAMEL-13768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Otavio Rodolfo Piske updated CAMEL-13768:
-----------------------------------------
Summary: camel-kafka: seek to specific offset and KafkaConsumer access
(was: Seek to specific offset and KafkaConsumer access )
> camel-kafka: seek to specific offset and KafkaConsumer access
> --------------------------------------------------------------
>
> Key: CAMEL-13768
> URL: https://issues.apache.org/jira/browse/CAMEL-13768
> Project: Camel
> Issue Type: New Feature
> Components: camel-kafka
> Affects Versions: 2.24.1, 3.x
> Reporter: michael elbaz
> Assignee: Otavio Rodolfo Piske
> Priority: Major
> Fix For: 3.12.0
>
>
> 1. Provide a way to rewind kafka offset to specific offset (improve seekTo ?)
> there is no way to do that using camel-kafka component. The main idea is to
> replay older kafka messages without starting from the beginning.
> for example:
> https://blog.sysco.no/integration/kafka-rewind-consumers-offset/
> {code:java}
> boolean flag = true;
> while (true) {
> ConsumerRecords<String, String> records = consumer.poll(100);
> if(flag) {
> Map<TopicPartition, Long> query = new HashMap<>();
> query.put(
> new TopicPartition("simple-topic-1", 0),
> Instant.now().minus(10, MINUTES).toEpochMilli());
> // Get offset from timestamp
> Map<TopicPartition, OffsetAndTimestamp> result =
> consumer.offsetsForTimes(query);
> // Rewind offset to previous position using seekTo
> result.entrySet()
> .stream()
> .forEach(entry -> consumer.seek(entry.getKey(),
> entry.getValue().offset()));
> flag = false;
> }
> for (ConsumerRecord<String, String> record : records)
> System.out.printf("offset = %d, key = %s, value = %s%n",
> record.offset(), record.key(), record.value());
> }
> {code}
> 2. Provide a way to access kafkaConsumer
> Add camel header with reference to kafkaConsumer to be able to perform
> some Kafka api call.We can use the same way that we do with KafkaManualCommit
> {code:java}
> public void process(Exchange exchange) {
> KafkaManualCommit manual =
> exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT,
> KafkaManualCommit.class);
> manual.commitSync();
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)