orpiske commented on a change in pull request #6093: URL: https://github.com/apache/camel/pull/6093#discussion_r708879905
########## File path: components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java ########## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.kafka.consumer.support; + +import java.util.Collection; +import java.util.Map; +import java.util.function.Supplier; + +import org.apache.camel.component.kafka.KafkaConfiguration; +import org.apache.camel.spi.StateRepository; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor.deserializeOffsetValue; +import static org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor.serializeOffsetKey; + +public class PartitionAssignmentListener implements ConsumerRebalanceListener { + private static final Logger LOG = LoggerFactory.getLogger(PartitionAssignmentListener.class); + + private final String threadId; + private final String topicName; + private final KafkaConfiguration configuration; + private final KafkaConsumer consumer; + private final Map<String, Long> lastProcessedOffset; + private Supplier<Boolean> stopStateSupplier; + + public PartitionAssignmentListener(String threadId, String topicName, KafkaConfiguration configuration, + KafkaConsumer consumer, Map<String, Long> lastProcessedOffset, + Supplier<Boolean> stopStateSupplier) { + this.threadId = threadId; + this.topicName = topicName; + this.configuration = configuration; + this.consumer = consumer; + this.lastProcessedOffset = lastProcessedOffset; + this.stopStateSupplier = stopStateSupplier; + } + + private void resumeFromOffset(TopicPartition topicPartition, String offsetState) { + // The state contains the last read offset, so you need to seek from the next one + long offset = deserializeOffsetValue(offsetState) + 1; + LOG.debug("Resuming partition {} from offset {} from state", topicPartition.partition(), offset); + consumer.seek(topicPartition, offset); + } + + @Override + public void onPartitionsRevoked(Collection<TopicPartition> partitions) { + LOG.debug("onPartitionsRevoked: {} from topic {}", threadId, topicName); + + // if camel is stopping, or we are not running + boolean stopping = stopStateSupplier.get(); + + for (TopicPartition partition : partitions) { + String offsetKey = serializeOffsetKey(partition); + Long offset = lastProcessedOffset.get(offsetKey); + if (offset == null) { + offset = -1L; + } + try { + // only commit offsets if the component has control + if (configuration.getAutoCommitEnable()) { + KafkaRecordProcessor.commitOffset(configuration, consumer, partition, offset, stopping, false, threadId); + } + } catch (Exception e) { + LOG.error("Error saving offset repository state {} from offsetKey {} with offset: {}", threadId, offsetKey, Review comment: This was pretty much the same code as we had before and the behavior should be more or less like this: Exceptions raised either here or on the `onPartitionsAssigned` method will be propagated to the Kafka's consumer `poll` [1] method. In our case, this means it would be handled as it poll itself has failed [2] or one of the subsequent operations (i.e.: process), thus being handled according to the poll exception strategy [3]. 1. https://github.com/orpiske/camel/blob/camel-kafka-use-executor/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java#L174 2. https://github.com/orpiske/camel/blob/camel-kafka-use-executor/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java#L200 3. https://github.com/orpiske/camel/blob/camel-kafka-use-executor/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java#L209 Personally, I think the error level is adequate in this case because the exception would later on appear as coming from poll. I think that if we decrease to `WARN` level, users might overlook it an miss it ... but that's just my .2 cents and I think we can change it anyway. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
