orpiske commented on a change in pull request #6093:
URL: https://github.com/apache/camel/pull/6093#discussion_r708879905



##########
File path: 
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.kafka.consumer.support;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.function.Supplier;
+
+import org.apache.camel.component.kafka.KafkaConfiguration;
+import org.apache.camel.spi.StateRepository;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor.deserializeOffsetValue;
+import static 
org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor.serializeOffsetKey;
+
+public class PartitionAssignmentListener implements ConsumerRebalanceListener {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PartitionAssignmentListener.class);
+
+    private final String threadId;
+    private final String topicName;
+    private final KafkaConfiguration configuration;
+    private final KafkaConsumer consumer;
+    private final Map<String, Long> lastProcessedOffset;
+    private Supplier<Boolean> stopStateSupplier;
+
+    public PartitionAssignmentListener(String threadId, String topicName, 
KafkaConfiguration configuration,
+                                       KafkaConsumer consumer, Map<String, 
Long> lastProcessedOffset,
+                                       Supplier<Boolean> stopStateSupplier) {
+        this.threadId = threadId;
+        this.topicName = topicName;
+        this.configuration = configuration;
+        this.consumer = consumer;
+        this.lastProcessedOffset = lastProcessedOffset;
+        this.stopStateSupplier = stopStateSupplier;
+    }
+
+    private void resumeFromOffset(TopicPartition topicPartition, String 
offsetState) {
+        // The state contains the last read offset, so you need to seek from 
the next one
+        long offset = deserializeOffsetValue(offsetState) + 1;
+        LOG.debug("Resuming partition {} from offset {} from state", 
topicPartition.partition(), offset);
+        consumer.seek(topicPartition, offset);
+    }
+
+    @Override
+    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+        LOG.debug("onPartitionsRevoked: {} from topic {}", threadId, 
topicName);
+
+        // if camel is stopping, or we are not running
+        boolean stopping = stopStateSupplier.get();
+
+        for (TopicPartition partition : partitions) {
+            String offsetKey = serializeOffsetKey(partition);
+            Long offset = lastProcessedOffset.get(offsetKey);
+            if (offset == null) {
+                offset = -1L;
+            }
+            try {
+                // only commit offsets if the component has control
+                if (configuration.getAutoCommitEnable()) {
+                    KafkaRecordProcessor.commitOffset(configuration, consumer, 
partition, offset, stopping, false, threadId);
+                }
+            } catch (Exception e) {
+                LOG.error("Error saving offset repository state {} from 
offsetKey {} with offset: {}", threadId, offsetKey,

Review comment:
       This was pretty much the same code as we had before and the behavior 
should be more or less like this: 
   
   Exceptions raised either here or on the `onPartitionsAssigned` method will 
be propagated to the Kafka's consumer `poll` [1] method. In our case, this 
means it would be handled as it poll itself has failed [2] or one of the 
subsequent operations (i.e.: process), thus being handled according to the poll 
exception strategy [3].
   
   1. 
https://github.com/orpiske/camel/blob/camel-kafka-use-executor/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java#L174
   2. 
https://github.com/orpiske/camel/blob/camel-kafka-use-executor/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java#L200
   3. 
https://github.com/orpiske/camel/blob/camel-kafka-use-executor/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java#L209
   
   Personally, I think the error level is adequate in this case because the 
exception would later on appear as coming from poll. I think that if we 
decrease to `WARN` level, users might overlook it an miss it ... but that's 
just my .2 cents and I think we can change it anyway. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to