[ https://issues.apache.org/jira/browse/CAMEL-12732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16581403#comment-16581403 ]
ASF GitHub Bot commented on CAMEL-12732: ---------------------------------------- GitHub user mikadev opened a pull request: https://github.com/apache/camel/pull/2475 CAMEL-12732 Take into account autoCommitEnable Check if autocommit is enabled before setting the state (Fix the manual commit for the state) You can merge this pull request into a Git repository by running: $ git pull https://github.com/mikadev/camel patch-3 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/camel/pull/2475.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2475 ---- commit 880656f64c35cb3f8cdac09236890917d2a0f2e9 Author: elbaz michael <michaelelbaz@...> Date: 2018-08-15T17:40:22Z CAMEL-12732 Take into account autoCommitEnable Check if autocommit is enabled before setting the state (Fix the manual commit for the state) ---- > Kafka manual commit to file repository doesn't work properly (using Spring > boot) > -------------------------------------------------------------------------------- > > Key: CAMEL-12732 > URL: https://issues.apache.org/jira/browse/CAMEL-12732 > Project: Camel > Issue Type: Bug > Components: camel-kafka > Affects Versions: 2.22.0 > Environment: Spring boot > kafka_2.11-1.1.0 > Reporter: michael elbaz > Priority: Major > > I'im trying to save the Kafka offset into FileStateRepository, the offset is > correctly writing but it is not reading at route start so camel will read all > the topic every time > > {code:java} > @Component > public class Route extends RouteBuilder { > @Override > public void configure() throws Exception { > from(kafka()) > .to("log:TEST?level=INFO") > .process(Route::commitKafka); > } > private String kafka() { > String kafkaEndpoint = "kafka:"; > kafkaEndpoint += "topictest"; > kafkaEndpoint += "?brokers="; > kafkaEndpoint += "localhost:9092"; > kafkaEndpoint += "&groupId="; > kafkaEndpoint += "TEST"; > kafkaEndpoint += "&autoOffsetReset="; > kafkaEndpoint += "earliest"; > kafkaEndpoint += "&autoCommitEnable="; > kafkaEndpoint += false; > kafkaEndpoint += "&allowManualCommit="; > kafkaEndpoint += true; > kafkaEndpoint += "&offsetRepository="; > kafkaEndpoint += "#fileStore"; > return kafkaEndpoint; > } > @Bean(name = "fileStore") > private FileStateRepository fileStateRepository() { > FileStateRepository fileStateRepository = > FileStateRepository.fileStateRepository(new > File("/kafka/offset_repo/repo.dat")); > // This will be empty > // System.out.println(fileStateRepository.getCache()); > return fileStateRepository; > } > private static void commitKafka(Exchange exchange) { > KafkaManualCommit manual = > exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, > KafkaManualCommit.class); > manual.commitSync(); > } > } > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)