[ 
https://issues.apache.org/jira/browse/CAMEL-12732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16582506#comment-16582506
 ] 

ASF GitHub Bot commented on CAMEL-12732:
----------------------------------------

davsclaus closed pull request #2478: CAMEL-12732 Take into account 
isAllowManualCommit
URL: https://github.com/apache/camel/pull/2478
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index e6913f3c698..df3b4b5769f 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -399,7 +399,7 @@ protected boolean doRun() {
 
         private void commitOffset(StateRepository<String, String> 
offsetRepository, TopicPartition partition, long partitionLastOffset, boolean 
forceCommit) {
             if (partitionLastOffset != -1) {
-                if (offsetRepository != null) {
+                if (!endpoint.getConfiguration().isAllowManualCommit() && 
offsetRepository != null) {
                     log.debug("Saving offset repository state {} from topic {} 
with offset: {}", threadId, topicName, partitionLastOffset);
                     offsetRepository.setState(serializeOffsetKey(partition), 
serializeOffsetValue(partitionLastOffset));
                 } else if (forceCommit) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Kafka manual commit to file repository doesn't work properly (using Spring 
> boot)
> --------------------------------------------------------------------------------
>
>                 Key: CAMEL-12732
>                 URL: https://issues.apache.org/jira/browse/CAMEL-12732
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-kafka
>    Affects Versions: 2.22.0
>         Environment: Spring boot
> kafka_2.11-1.1.0
>            Reporter: michael elbaz
>            Assignee: Claus Ibsen
>            Priority: Major
>             Fix For: 2.23.0
>
>
> I'im trying to save the Kafka offset into FileStateRepository, the offset is 
> correctly writing but it is not reading at route start so camel will read all 
> the topic every time
>  
> {code:java}
> @Component
> public class Route extends RouteBuilder {
>     @Override
>     public void configure() throws Exception {
>         from(kafka())
>                 .to("log:TEST?level=INFO")
>                 .process(Route::commitKafka);
>     }
>     private String kafka() {
>         String kafkaEndpoint = "kafka:";
>         kafkaEndpoint += "topictest";
>         kafkaEndpoint += "?brokers=";
>         kafkaEndpoint += "localhost:9092";
>         kafkaEndpoint += "&groupId=";
>         kafkaEndpoint += "TEST";
>         kafkaEndpoint += "&autoOffsetReset=";
>         kafkaEndpoint += "earliest";
>         kafkaEndpoint += "&autoCommitEnable=";
>         kafkaEndpoint += false;
>         kafkaEndpoint += "&allowManualCommit=";
>         kafkaEndpoint += true;
>         kafkaEndpoint += "&offsetRepository=";
>         kafkaEndpoint += "#fileStore";
>         return kafkaEndpoint;
>     }
>     @Bean(name = "fileStore")
>     private FileStateRepository fileStateRepository() {
>         FileStateRepository fileStateRepository = 
> FileStateRepository.fileStateRepository(new 
> File("/kafka/offset_repo/repo.dat"));
>         // This will be empty
>         // System.out.println(fileStateRepository.getCache());
>         return fileStateRepository;
>     }
>     private static void commitKafka(Exchange exchange) {
>         KafkaManualCommit manual = 
> exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, 
> KafkaManualCommit.class);
>         manual.commitSync();
>     }
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to