[
https://issues.apache.org/jira/browse/CAMEL-12732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
michael elbaz updated CAMEL-12732:
----------------------------------
Summary: Kafka manual commit to file repository doesn't work (using Spring
boot) (was: Kafka manual commit to file repository doesn't work)
> Kafka manual commit to file repository doesn't work (using Spring boot)
> -----------------------------------------------------------------------
>
> Key: CAMEL-12732
> URL: https://issues.apache.org/jira/browse/CAMEL-12732
> Project: Camel
> Issue Type: Bug
> Components: camel-kafka
> Affects Versions: 2.22.0
> Environment: Spring boot
> kafka_2.11-1.1.0
> Reporter: michael elbaz
> Priority: Major
>
> I'im trying to save the Kafka offset into FileStateRepository, the offset is
> correctly writing but it is not reading at route start so camel will read all
> the topic every time
>
> {code:java}
> @Component
> public class Route extends RouteBuilder {
> @Override
> public void configure() throws Exception {
> from(kafka())
> .to("log:TEST?level=INFO")
> .process(Route::commitKafka);
> }
> private String kafka() {
> String kafkaEndpoint = "kafka:";
> kafkaEndpoint += "topictest";
> kafkaEndpoint += "?brokers=";
> kafkaEndpoint += "localhost:9092";
> kafkaEndpoint += "&groupId=";
> kafkaEndpoint += "TEST";
> kafkaEndpoint += "&autoOffsetReset=";
> kafkaEndpoint += "earliest";
> kafkaEndpoint += "&autoCommitEnable=";
> kafkaEndpoint += false;
> kafkaEndpoint += "&allowManualCommit=";
> kafkaEndpoint += true;
> kafkaEndpoint += "&offsetRepository=";
> kafkaEndpoint += "#fileStore";
> return kafkaEndpoint;
> }
> @Bean(name = "fileStore")
> private FileStateRepository fileStateRepository() {
> FileStateRepository fileStateRepository =
> FileStateRepository.fileStateRepository(new
> File("/kafka/offset_repo/repo.dat"));
> // This will be empty
> // System.out.println(fileStateRepository.getCache());
> return fileStateRepository;
> }
> private static void commitKafka(Exchange exchange) {
> KafkaManualCommit manual =
> exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT,
> KafkaManualCommit.class);
> manual.commitSync();
> }
> }
> {code}
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)