davsclaus commented on a change in pull request #6093: URL: https://github.com/apache/camel/pull/6093#discussion_r708838883
########## File path: components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java ########## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.kafka.consumer.support; + +import java.util.Collection; +import java.util.Map; +import java.util.function.Supplier; + +import org.apache.camel.component.kafka.KafkaConfiguration; +import org.apache.camel.spi.StateRepository; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor.deserializeOffsetValue; +import static org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor.serializeOffsetKey; + +public class PartitionAssignmentListener implements ConsumerRebalanceListener { + private static final Logger LOG = LoggerFactory.getLogger(PartitionAssignmentListener.class); + + private final String threadId; + private final String topicName; + private final KafkaConfiguration configuration; + private final KafkaConsumer consumer; + private final Map<String, Long> lastProcessedOffset; + private Supplier<Boolean> stopStateSupplier; + + public PartitionAssignmentListener(String threadId, String topicName, KafkaConfiguration configuration, + KafkaConsumer consumer, Map<String, Long> lastProcessedOffset, + Supplier<Boolean> stopStateSupplier) { + this.threadId = threadId; + this.topicName = topicName; + this.configuration = configuration; + this.consumer = consumer; + this.lastProcessedOffset = lastProcessedOffset; + this.stopStateSupplier = stopStateSupplier; + } + + private void resumeFromOffset(TopicPartition topicPartition, String offsetState) { + // The state contains the last read offset, so you need to seek from the next one + long offset = deserializeOffsetValue(offsetState) + 1; + LOG.debug("Resuming partition {} from offset {} from state", topicPartition.partition(), offset); + consumer.seek(topicPartition, offset); + } + + @Override + public void onPartitionsRevoked(Collection<TopicPartition> partitions) { + LOG.debug("onPartitionsRevoked: {} from topic {}", threadId, topicName); + + // if camel is stopping, or we are not running + boolean stopping = stopStateSupplier.get(); + + for (TopicPartition partition : partitions) { + String offsetKey = serializeOffsetKey(partition); + Long offset = lastProcessedOffset.get(offsetKey); + if (offset == null) { + offset = -1L; + } + try { + // only commit offsets if the component has control + if (configuration.getAutoCommitEnable()) { + KafkaRecordProcessor.commitOffset(configuration, consumer, partition, offset, stopping, false, threadId); + } + } catch (Exception e) { + LOG.error("Error saving offset repository state {} from offsetKey {} with offset: {}", threadId, offsetKey, Review comment: I wonder what kafka can do about this throwing exception? Should it be a WARN instead of not rethrow? What happens in kafka when you throw runtime exception from this method? ########## File path: components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java ########## @@ -139,8 +142,13 @@ protected void doStop() throws Exception { LOG.debug("Shutting down Kafka consumer worker threads with timeout {} millis", timeout); getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(executor, timeout); } else { - executor.shutdownNow(); + executor.shutdown(); } + + if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { + LOG.warn("The tasks did not finish within the specified time"); Review comment: This WARN message needs more details so end users can understand it. Also the 5 sec is hardcoded ########## File path: components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategy.java ########## @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.kafka.consumer.support; + +public interface ResumeStrategy { Review comment: Add javadoc to this SPI ########## File path: components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java ########## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.kafka.consumer.support; + +import org.apache.camel.spi.StateRepository; +import org.apache.kafka.clients.consumer.KafkaConsumer; + +public final class ResumeStrategyFactory { + // NO-OP ... + private static class NoOpResumeStrategy implements ResumeStrategy { + @Override + public void resume() { + + } + + @Override + public String describe() { + return "no-op"; + } + } + + private static final NoOpResumeStrategy NO_OP_RESUME_STRATEGY = new NoOpResumeStrategy(); + + private ResumeStrategyFactory() { + } + + public static ResumeStrategy newResumeStrategy( Review comment: We should make this pluggable so end users can use their own implementations. But that can be another JIRA ########## File path: components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java ########## @@ -71,361 +75,322 @@ this.kafkaProps = kafkaProps; } + void preInit() { + createConsumer(); + + StateRepository<String, String> offsetRepository = kafkaConsumer.getEndpoint().getConfiguration().getOffsetRepository(); + String seekPolicy = kafkaConsumer.getEndpoint().getConfiguration().getSeekTo(); + resumeStrategy = ResumeStrategyFactory.newResumeStrategy(consumer, offsetRepository, seekPolicy); + } + @Override public void run() { - boolean first = true; - final AtomicBoolean reTry = new AtomicBoolean(true); - final AtomicBoolean reConnect = new AtomicBoolean(true); + if (!isKafkaConsumerRunnable()) { + return; + } - while (reTry.get() || reConnect.get()) { + if (isRetrying() || isReconnecting()) { try { - if (first || reConnect.get()) { + if (isReconnecting()) { // re-initialize on re-connect so we have a fresh consumer - doInit(); + createConsumer(); } } catch (Exception e) { // ensure this is logged so users can see the problem LOG.warn("Error creating org.apache.kafka.clients.consumer.KafkaConsumer due {}", e.getMessage(), e); } - if (!first) { - // skip one poll timeout before trying again - long delay = kafkaConsumer.getEndpoint().getConfiguration().getPollTimeoutMs(); - String prefix = reConnect.get() ? "Reconnecting" : "Retrying"; - LOG.info("{} {} to topic {} after {} ms", prefix, threadId, topicName, delay); - try { - Thread.sleep(delay); - } catch (InterruptedException e) { - boolean stopping = kafkaConsumer.getEndpoint().getCamelContext().isStopping(); - if (stopping) { - LOG.info( - "CamelContext is stopping so terminating KafkaConsumer thread: {} receiving from topic: {}", - threadId, topicName); - return; - } - } - } - - first = false; - - if (isCloseable()) { - LOG.debug("Closing consumer {}", threadId); - IOHelper.close(consumer); - return; - } + long delay = kafkaConsumer.getEndpoint().getConfiguration().getPollTimeoutMs(); + String prefix = isReconnecting() ? "Reconnecting" : "Retrying"; + LOG.info("{} {} to topic {} after {} ms", prefix, threadId, topicName, delay); - // doRun keeps running until we either shutdown or is told to re-connect - doRun(reTry, reConnect); + doRun(); } LOG.info("Terminating KafkaConsumer thread: {} receiving from topic: {}", threadId, topicName); + safeUnsubscribe(); + IOHelper.close(consumer); } - private boolean isCloseable() { - return !kafkaConsumer.isRunAllowed() || kafkaConsumer.isStoppingOrStopped() || kafkaConsumer.isSuspendingOrSuspended(); - } - - void preInit() { - doInit(); - } - - protected void doInit() { + protected void createConsumer() { // create consumer ClassLoader threadClassLoader = Thread.currentThread().getContextClassLoader(); try { - // Kafka uses reflection for loading authentication settings, - // use its classloader + // Kafka uses reflection for loading authentication settings, use its classloader Thread.currentThread() .setContextClassLoader(org.apache.kafka.clients.consumer.KafkaConsumer.class.getClassLoader()); - // this may throw an exception if something is wrong with kafka - // consumer + + // this may throw an exception if something is wrong with kafka consumer this.consumer = kafkaConsumer.getEndpoint().getKafkaClientFactory().getConsumer(kafkaProps); } finally { Thread.currentThread().setContextClassLoader(threadClassLoader); } } - @SuppressWarnings("unchecked") - protected void doRun(AtomicBoolean retry, AtomicBoolean reconnect) { - if (reconnect.get()) { + protected void doRun() { + if (isReconnecting()) { + subscribe(); + // on first run or reconnecting - doReconnectRun(); - // set reconnect to false as its done now - reconnect.set(false); - // set retry to true to continue polling - retry.set(true); - } - // polling - doPollRun(retry, reconnect); - } + resume(); - protected void doReconnectRun() { - if (topicPattern != null) { - LOG.info("Subscribing {} to topic pattern {}", threadId, topicName); - consumer.subscribe(topicPattern, this); - } else { - LOG.info("Subscribing {} to topic {}", threadId, topicName); - consumer.subscribe(Arrays.asList(topicName.split(",")), this); - } + // set reconnect to false as the connection and resume is done at this point + setReconnect(false); - StateRepository<String, String> offsetRepository = kafkaConsumer.getEndpoint().getConfiguration().getOffsetRepository(); - if (offsetRepository != null) { - resumeFromOffsetRepository(offsetRepository); - } else if (kafkaConsumer.getEndpoint().getConfiguration().getSeekTo() != null) { - resumeFromSeekPolicy(); + // set retry to true to continue polling + setRetry(true); } - } - private void resumeFromOffsetRepository(StateRepository<String, String> offsetRepository) { - for (TopicPartition topicPartition : (Set<TopicPartition>) consumer.assignment()) { - String offsetState = offsetRepository.getState(serializeOffsetKey(topicPartition)); - if (offsetState != null && !offsetState.isEmpty()) { - resumeFromOffset(topicPartition, offsetState); - } - } + // start polling + startPolling(); } - private void resumeFromOffset(TopicPartition topicPartition, String offsetState) { - // The state contains the last read offset, so you need to seek from the next one - long offset = deserializeOffsetValue(offsetState) + 1; - LOG.debug("Resuming partition {} from offset {} from state", topicPartition.partition(), offset); - consumer.seek(topicPartition, offset); + protected void resume() { + resumeStrategy.resume(); } - private void resumeFromSeekPolicy() { - if (kafkaConsumer.getEndpoint().getConfiguration().getSeekTo().equals("beginning")) { - LOG.debug("{} is seeking to the beginning on topic {}", threadId, topicName); - // This poll to ensure we have an assigned partition - // otherwise seek won't work - consumer.poll(Duration.ofMillis(100)); - consumer.seekToBeginning(consumer.assignment()); - } else if (kafkaConsumer.getEndpoint().getConfiguration().getSeekTo().equals("end")) { - LOG.debug("{} is seeking to the end on topic {}", threadId, topicName); - // This poll to ensures we have an assigned partition - // otherwise seek won't work - consumer.poll(Duration.ofMillis(100)); - consumer.seekToEnd(consumer.assignment()); + private void subscribe() { + PartitionAssignmentListener listener = new PartitionAssignmentListener( + threadId, topicName, + kafkaConsumer.getEndpoint().getConfiguration(), consumer, lastProcessedOffset, this::isRunnable); + + if (topicPattern != null) { + LOG.info("Subscribing {} to topic pattern {}", threadId, topicName); + consumer.subscribe(topicPattern, listener); + } else { + LOG.info("Subscribing {} to topic {}", threadId, topicName); + consumer.subscribe(Arrays.asList(topicName.split(",")), listener); } } - protected void doPollRun(AtomicBoolean retry, AtomicBoolean reconnect) { - StateRepository<String, String> offsetRepository = kafkaConsumer.getEndpoint().getConfiguration().getOffsetRepository(); - - // allow to re-connect thread in case we use that to retry failed messages - boolean unsubscribing = false; - - TopicPartition partition = null; + protected void startPolling() { long partitionLastOffset = -1; try { - while (kafkaConsumer.isRunAllowed() && !kafkaConsumer.isStoppingOrStopped() - && !kafkaConsumer.isSuspendingOrSuspended() - && retry.get() && !reconnect.get()) { - - // flag to break out processing on the first exception - boolean breakOnErrorHit = false; + while (isKafkaConsumerRunnable() && isRetrying() && !isReconnecting()) { long pollTimeoutMs = kafkaConsumer.getEndpoint().getConfiguration().getPollTimeoutMs(); LOG.trace("Polling {} from topic: {} with timeout: {}", threadId, topicName, pollTimeoutMs); ConsumerRecords<Object, Object> allRecords = consumer.poll(Duration.ofMillis(pollTimeoutMs)); - - Iterator<TopicPartition> partitionIterator = allRecords.partitions().iterator(); - while (partitionIterator.hasNext()) { - partition = partitionIterator.next(); - partitionLastOffset = -1; - - Iterator<ConsumerRecord<Object, Object>> recordIterator = allRecords.records(partition).iterator(); - LOG.debug("Records count {} received for partition {}", allRecords.records(partition).size(), - partition); - if (!breakOnErrorHit && recordIterator.hasNext()) { - - while (!breakOnErrorHit && recordIterator.hasNext()) { - ConsumerRecord<Object, Object> record = recordIterator.next(); - if (LOG.isTraceEnabled()) { - LOG.trace("Partition = {}, offset = {}, key = {}, value = {}", record.partition(), - record.offset(), record.key(), record.value()); - } - Exchange exchange = createKafkaExchange(record); - - propagateHeaders(record, exchange, kafkaConsumer.getEndpoint().getConfiguration()); - - // if not auto commit then we have additional - // information on the exchange - if (!isAutoCommitEnabled()) { - exchange.getIn().setHeader(KafkaConstants.LAST_RECORD_BEFORE_COMMIT, - !recordIterator.hasNext()); - } - if (kafkaConsumer.getEndpoint().getConfiguration().isAllowManualCommit()) { - // allow Camel users to access the Kafka - // consumer API to be able to do for example - // manual commits - KafkaManualCommit manual - = kafkaConsumer.getEndpoint().getComponent().getKafkaManualCommitFactory() - .newInstance(exchange, consumer, topicName, threadId, - offsetRepository, partition, record.offset()); - exchange.getIn().setHeader(KafkaConstants.MANUAL_COMMIT, manual); - } - // if commit management is on user side give additional info for the end of poll loop - if (!isAutoCommitEnabled() - || kafkaConsumer.getEndpoint().getConfiguration().isAllowManualCommit()) { - exchange.getIn().setHeader(KafkaConstants.LAST_POLL_RECORD, - !recordIterator.hasNext() && !partitionIterator.hasNext()); - } - - try { - kafkaConsumer.getProcessor().process(exchange); - } catch (Exception e) { - exchange.setException(e); - } - - if (exchange.getException() != null) { - // processing failed due to an unhandled - // exception, what should we do - if (kafkaConsumer.getEndpoint().getConfiguration().isBreakOnFirstError()) { - // we are failing and we should break out - LOG.warn( - "Error during processing {} from topic: {}. Will seek consumer to offset: {} and re-connect and start polling again.", - exchange, topicName, partitionLastOffset, exchange.getException()); - // force commit so we resume on next poll where we failed - commitOffset(offsetRepository, partition, partitionLastOffset, false, true); - // continue to next partition - breakOnErrorHit = true; - } else { - // will handle/log the exception and - // then continue to next - kafkaConsumer.getExceptionHandler().handleException("Error during processing", exchange, - exchange.getException()); - } - } else { - // record was success so remember its offset - partitionLastOffset = record.offset(); - // lastOffsetProcessed would be used by - // Consumer re-balance listener to preserve - // offset state upon partition revoke - lastProcessedOffset.put(serializeOffsetKey(partition), partitionLastOffset); - } - - // success so release the exchange - kafkaConsumer.releaseExchange(exchange, false); - } - - if (!breakOnErrorHit) { - // all records processed from partition so commit them - commitOffset(offsetRepository, partition, partitionLastOffset, false, false); - } - } + if (allRecords.isEmpty()) { + LOG.debug("No records received when polling ... (continuing)"); } - if (breakOnErrorHit) { - // force re-connect - reconnect.set(true); - retry.set(false); // to close the current consumer - } + partitionLastOffset = processPolledRecords(allRecords); } - if (!reconnect.get()) { - if (isAutoCommitEnabled()) { - if ("async".equals(kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitOnStop())) { - LOG.info("Auto commitAsync on stop {} from topic {}", threadId, topicName); - consumer.commitAsync(); - } else if ("sync".equals(kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitOnStop())) { - LOG.info("Auto commitSync on stop {} from topic {}", threadId, topicName); - consumer.commitSync(); - } else if ("none".equals(kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitOnStop())) { - LOG.info("Auto commit on stop {} from topic {} is disabled (none)", threadId, topicName); - } - } + if (!isReconnecting()) { + LOG.debug("Not reconnecting, check whether to auto-commit or not ..."); + commit(); } - LOG.info("Unsubscribing {} from topic {}", threadId, topicName); - // we are unsubscribing so do not re connect - unsubscribing = true; - consumer.unsubscribe(); + safeUnsubscribe(); } catch (InterruptException e) { kafkaConsumer.getExceptionHandler().handleException("Interrupted while consuming " + threadId + " from kafka topic", e); + commit(); + LOG.info("Unsubscribing {} from topic {}", threadId, topicName); - consumer.unsubscribe(); + safeUnsubscribe(); Thread.currentThread().interrupt(); + } catch (WakeupException e) { + // This is normal: it raises this exception when calling the wakeUp (which happens when we stop) + LOG.info("The kafka consumer was woken up while polling"); Review comment: Maybe add a bit more details such as the threadId, topicName etc. ########## File path: components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java ########## @@ -71,361 +75,322 @@ this.kafkaProps = kafkaProps; } + void preInit() { + createConsumer(); + + StateRepository<String, String> offsetRepository = kafkaConsumer.getEndpoint().getConfiguration().getOffsetRepository(); + String seekPolicy = kafkaConsumer.getEndpoint().getConfiguration().getSeekTo(); + resumeStrategy = ResumeStrategyFactory.newResumeStrategy(consumer, offsetRepository, seekPolicy); + } + @Override public void run() { - boolean first = true; - final AtomicBoolean reTry = new AtomicBoolean(true); - final AtomicBoolean reConnect = new AtomicBoolean(true); + if (!isKafkaConsumerRunnable()) { + return; + } - while (reTry.get() || reConnect.get()) { + if (isRetrying() || isReconnecting()) { try { - if (first || reConnect.get()) { + if (isReconnecting()) { // re-initialize on re-connect so we have a fresh consumer - doInit(); + createConsumer(); } } catch (Exception e) { // ensure this is logged so users can see the problem LOG.warn("Error creating org.apache.kafka.clients.consumer.KafkaConsumer due {}", e.getMessage(), e); } - if (!first) { - // skip one poll timeout before trying again - long delay = kafkaConsumer.getEndpoint().getConfiguration().getPollTimeoutMs(); - String prefix = reConnect.get() ? "Reconnecting" : "Retrying"; - LOG.info("{} {} to topic {} after {} ms", prefix, threadId, topicName, delay); - try { - Thread.sleep(delay); - } catch (InterruptedException e) { - boolean stopping = kafkaConsumer.getEndpoint().getCamelContext().isStopping(); - if (stopping) { - LOG.info( - "CamelContext is stopping so terminating KafkaConsumer thread: {} receiving from topic: {}", - threadId, topicName); - return; - } - } - } - - first = false; - - if (isCloseable()) { - LOG.debug("Closing consumer {}", threadId); - IOHelper.close(consumer); - return; - } + long delay = kafkaConsumer.getEndpoint().getConfiguration().getPollTimeoutMs(); + String prefix = isReconnecting() ? "Reconnecting" : "Retrying"; + LOG.info("{} {} to topic {} after {} ms", prefix, threadId, topicName, delay); - // doRun keeps running until we either shutdown or is told to re-connect - doRun(reTry, reConnect); + doRun(); } LOG.info("Terminating KafkaConsumer thread: {} receiving from topic: {}", threadId, topicName); + safeUnsubscribe(); + IOHelper.close(consumer); } - private boolean isCloseable() { - return !kafkaConsumer.isRunAllowed() || kafkaConsumer.isStoppingOrStopped() || kafkaConsumer.isSuspendingOrSuspended(); - } - - void preInit() { - doInit(); - } - - protected void doInit() { + protected void createConsumer() { // create consumer ClassLoader threadClassLoader = Thread.currentThread().getContextClassLoader(); try { - // Kafka uses reflection for loading authentication settings, - // use its classloader + // Kafka uses reflection for loading authentication settings, use its classloader Thread.currentThread() .setContextClassLoader(org.apache.kafka.clients.consumer.KafkaConsumer.class.getClassLoader()); - // this may throw an exception if something is wrong with kafka - // consumer + + // this may throw an exception if something is wrong with kafka consumer this.consumer = kafkaConsumer.getEndpoint().getKafkaClientFactory().getConsumer(kafkaProps); } finally { Thread.currentThread().setContextClassLoader(threadClassLoader); } } - @SuppressWarnings("unchecked") - protected void doRun(AtomicBoolean retry, AtomicBoolean reconnect) { - if (reconnect.get()) { + protected void doRun() { + if (isReconnecting()) { + subscribe(); + // on first run or reconnecting - doReconnectRun(); - // set reconnect to false as its done now - reconnect.set(false); - // set retry to true to continue polling - retry.set(true); - } - // polling - doPollRun(retry, reconnect); - } + resume(); - protected void doReconnectRun() { - if (topicPattern != null) { - LOG.info("Subscribing {} to topic pattern {}", threadId, topicName); - consumer.subscribe(topicPattern, this); - } else { - LOG.info("Subscribing {} to topic {}", threadId, topicName); - consumer.subscribe(Arrays.asList(topicName.split(",")), this); - } + // set reconnect to false as the connection and resume is done at this point + setReconnect(false); - StateRepository<String, String> offsetRepository = kafkaConsumer.getEndpoint().getConfiguration().getOffsetRepository(); - if (offsetRepository != null) { - resumeFromOffsetRepository(offsetRepository); - } else if (kafkaConsumer.getEndpoint().getConfiguration().getSeekTo() != null) { - resumeFromSeekPolicy(); + // set retry to true to continue polling + setRetry(true); } - } - private void resumeFromOffsetRepository(StateRepository<String, String> offsetRepository) { - for (TopicPartition topicPartition : (Set<TopicPartition>) consumer.assignment()) { - String offsetState = offsetRepository.getState(serializeOffsetKey(topicPartition)); - if (offsetState != null && !offsetState.isEmpty()) { - resumeFromOffset(topicPartition, offsetState); - } - } + // start polling + startPolling(); } - private void resumeFromOffset(TopicPartition topicPartition, String offsetState) { - // The state contains the last read offset, so you need to seek from the next one - long offset = deserializeOffsetValue(offsetState) + 1; - LOG.debug("Resuming partition {} from offset {} from state", topicPartition.partition(), offset); - consumer.seek(topicPartition, offset); + protected void resume() { + resumeStrategy.resume(); } - private void resumeFromSeekPolicy() { - if (kafkaConsumer.getEndpoint().getConfiguration().getSeekTo().equals("beginning")) { - LOG.debug("{} is seeking to the beginning on topic {}", threadId, topicName); - // This poll to ensure we have an assigned partition - // otherwise seek won't work - consumer.poll(Duration.ofMillis(100)); - consumer.seekToBeginning(consumer.assignment()); - } else if (kafkaConsumer.getEndpoint().getConfiguration().getSeekTo().equals("end")) { - LOG.debug("{} is seeking to the end on topic {}", threadId, topicName); - // This poll to ensures we have an assigned partition - // otherwise seek won't work - consumer.poll(Duration.ofMillis(100)); - consumer.seekToEnd(consumer.assignment()); + private void subscribe() { + PartitionAssignmentListener listener = new PartitionAssignmentListener( + threadId, topicName, + kafkaConsumer.getEndpoint().getConfiguration(), consumer, lastProcessedOffset, this::isRunnable); + + if (topicPattern != null) { + LOG.info("Subscribing {} to topic pattern {}", threadId, topicName); + consumer.subscribe(topicPattern, listener); + } else { + LOG.info("Subscribing {} to topic {}", threadId, topicName); + consumer.subscribe(Arrays.asList(topicName.split(",")), listener); } } - protected void doPollRun(AtomicBoolean retry, AtomicBoolean reconnect) { - StateRepository<String, String> offsetRepository = kafkaConsumer.getEndpoint().getConfiguration().getOffsetRepository(); - - // allow to re-connect thread in case we use that to retry failed messages - boolean unsubscribing = false; - - TopicPartition partition = null; + protected void startPolling() { long partitionLastOffset = -1; try { - while (kafkaConsumer.isRunAllowed() && !kafkaConsumer.isStoppingOrStopped() - && !kafkaConsumer.isSuspendingOrSuspended() - && retry.get() && !reconnect.get()) { - - // flag to break out processing on the first exception - boolean breakOnErrorHit = false; + while (isKafkaConsumerRunnable() && isRetrying() && !isReconnecting()) { long pollTimeoutMs = kafkaConsumer.getEndpoint().getConfiguration().getPollTimeoutMs(); LOG.trace("Polling {} from topic: {} with timeout: {}", threadId, topicName, pollTimeoutMs); ConsumerRecords<Object, Object> allRecords = consumer.poll(Duration.ofMillis(pollTimeoutMs)); - - Iterator<TopicPartition> partitionIterator = allRecords.partitions().iterator(); - while (partitionIterator.hasNext()) { - partition = partitionIterator.next(); - partitionLastOffset = -1; - - Iterator<ConsumerRecord<Object, Object>> recordIterator = allRecords.records(partition).iterator(); - LOG.debug("Records count {} received for partition {}", allRecords.records(partition).size(), - partition); - if (!breakOnErrorHit && recordIterator.hasNext()) { - - while (!breakOnErrorHit && recordIterator.hasNext()) { - ConsumerRecord<Object, Object> record = recordIterator.next(); - if (LOG.isTraceEnabled()) { - LOG.trace("Partition = {}, offset = {}, key = {}, value = {}", record.partition(), - record.offset(), record.key(), record.value()); - } - Exchange exchange = createKafkaExchange(record); - - propagateHeaders(record, exchange, kafkaConsumer.getEndpoint().getConfiguration()); - - // if not auto commit then we have additional - // information on the exchange - if (!isAutoCommitEnabled()) { - exchange.getIn().setHeader(KafkaConstants.LAST_RECORD_BEFORE_COMMIT, - !recordIterator.hasNext()); - } - if (kafkaConsumer.getEndpoint().getConfiguration().isAllowManualCommit()) { - // allow Camel users to access the Kafka - // consumer API to be able to do for example - // manual commits - KafkaManualCommit manual - = kafkaConsumer.getEndpoint().getComponent().getKafkaManualCommitFactory() - .newInstance(exchange, consumer, topicName, threadId, - offsetRepository, partition, record.offset()); - exchange.getIn().setHeader(KafkaConstants.MANUAL_COMMIT, manual); - } - // if commit management is on user side give additional info for the end of poll loop - if (!isAutoCommitEnabled() - || kafkaConsumer.getEndpoint().getConfiguration().isAllowManualCommit()) { - exchange.getIn().setHeader(KafkaConstants.LAST_POLL_RECORD, - !recordIterator.hasNext() && !partitionIterator.hasNext()); - } - - try { - kafkaConsumer.getProcessor().process(exchange); - } catch (Exception e) { - exchange.setException(e); - } - - if (exchange.getException() != null) { - // processing failed due to an unhandled - // exception, what should we do - if (kafkaConsumer.getEndpoint().getConfiguration().isBreakOnFirstError()) { - // we are failing and we should break out - LOG.warn( - "Error during processing {} from topic: {}. Will seek consumer to offset: {} and re-connect and start polling again.", - exchange, topicName, partitionLastOffset, exchange.getException()); - // force commit so we resume on next poll where we failed - commitOffset(offsetRepository, partition, partitionLastOffset, false, true); - // continue to next partition - breakOnErrorHit = true; - } else { - // will handle/log the exception and - // then continue to next - kafkaConsumer.getExceptionHandler().handleException("Error during processing", exchange, - exchange.getException()); - } - } else { - // record was success so remember its offset - partitionLastOffset = record.offset(); - // lastOffsetProcessed would be used by - // Consumer re-balance listener to preserve - // offset state upon partition revoke - lastProcessedOffset.put(serializeOffsetKey(partition), partitionLastOffset); - } - - // success so release the exchange - kafkaConsumer.releaseExchange(exchange, false); - } - - if (!breakOnErrorHit) { - // all records processed from partition so commit them - commitOffset(offsetRepository, partition, partitionLastOffset, false, false); - } - } + if (allRecords.isEmpty()) { + LOG.debug("No records received when polling ... (continuing)"); } - if (breakOnErrorHit) { - // force re-connect - reconnect.set(true); - retry.set(false); // to close the current consumer - } + partitionLastOffset = processPolledRecords(allRecords); } - if (!reconnect.get()) { - if (isAutoCommitEnabled()) { - if ("async".equals(kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitOnStop())) { - LOG.info("Auto commitAsync on stop {} from topic {}", threadId, topicName); - consumer.commitAsync(); - } else if ("sync".equals(kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitOnStop())) { - LOG.info("Auto commitSync on stop {} from topic {}", threadId, topicName); - consumer.commitSync(); - } else if ("none".equals(kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitOnStop())) { - LOG.info("Auto commit on stop {} from topic {} is disabled (none)", threadId, topicName); - } - } + if (!isReconnecting()) { + LOG.debug("Not reconnecting, check whether to auto-commit or not ..."); + commit(); } - LOG.info("Unsubscribing {} from topic {}", threadId, topicName); - // we are unsubscribing so do not re connect - unsubscribing = true; - consumer.unsubscribe(); + safeUnsubscribe(); } catch (InterruptException e) { kafkaConsumer.getExceptionHandler().handleException("Interrupted while consuming " + threadId + " from kafka topic", e); + commit(); + LOG.info("Unsubscribing {} from topic {}", threadId, topicName); - consumer.unsubscribe(); + safeUnsubscribe(); Thread.currentThread().interrupt(); + } catch (WakeupException e) { + // This is normal: it raises this exception when calling the wakeUp (which happens when we stop) + LOG.info("The kafka consumer was woken up while polling"); + safeUnsubscribe(); } catch (Exception e) { if (LOG.isDebugEnabled()) { - LOG.debug("Exception caught while polling " + threadId + " from kafka topic " + topicName - + " at offset " + lastProcessedOffset + ". Deciding what to do.", - e); - } - if (unsubscribing) { - // some kind of error in kafka, it may happen during unsubscribing - kafkaConsumer.getExceptionHandler().handleException( - "Error unsubscribing " + threadId + " from kafka topic " + topicName, - e); + LOG.warn("Exception {} caught while polling {} from kafka topic {} at offset {}: {}", + e.getClass().getName(), threadId, topicName, lastProcessedOffset, e.getMessage(), e); } else { - PollOnError onError = pollExceptionStrategy.handleException(e); - if (PollOnError.RETRY == onError) { - LOG.warn( - "{} consuming {} from topic {} causedby {}. Will attempt again polling the same message (stacktrace in DEBUG logging level)", - e.getClass().getName(), threadId, topicName, e.getMessage()); - if (LOG.isDebugEnabled()) { - LOG.debug( - "KafkaException consuming {} from topic {} causedby {}. Will attempt again polling the same message", - threadId, topicName, e.getMessage(), e); - } - // consumer retry the same message again - retry.set(true); - } else if (PollOnError.RECONNECT == onError) { - LOG.warn( - "{} consuming {} from topic {} causedby {}. Will attempt to re-connect on next run (stacktrace in DEBUG logging level)", - e.getClass().getName(), threadId, topicName, e.getMessage()); - if (LOG.isDebugEnabled()) { - LOG.debug( - "{} consuming {} from topic {} causedby {}. Will attempt to re-connect on next run", - e.getClass().getName(), threadId, topicName, e.getMessage(), e); - } - // re-connect so the consumer can try the same message again - reconnect.set(true); - retry.set(false); // to close the current consumer - } else if (PollOnError.ERROR_HANDLER == onError) { - // use bridge error handler to route with exception - bridge.handleException(e); - // skip this poison message and seek to next message - seekToNextOffset(partitionLastOffset); - } else if (PollOnError.DISCARD == onError) { - // discard message - LOG.warn( - "{} consuming {} from topic {} causedby {}. Will discard the message and continue to poll the next message (stracktrace in DEBUG logging level).", - e.getClass().getName(), threadId, topicName, e.getMessage()); - if (LOG.isDebugEnabled()) { - LOG.debug( - "{} consuming {} from topic {} causedby {}. Will discard the message and continue to poll the next message.", - e.getClass().getName(), threadId, topicName, e.getMessage(), e); - } - // skip this poison message and seek to next message - seekToNextOffset(partitionLastOffset); - } else if (PollOnError.STOP == onError) { - // stop and terminate consumer - LOG.warn( - "{} consuming {} from topic {} causedby {}. Will stop consumer (stacktrace in DEBUG logging level).", - e.getClass().getName(), threadId, topicName, e.getMessage()); - if (LOG.isDebugEnabled()) { - LOG.debug( - "{} consuming {} from topic {} causedby {}. Will stop consumer.", - e.getClass().getName(), threadId, topicName, e.getMessage(), e); - } - retry.set(false); - reconnect.set(false); - } + LOG.warn("Exception {} caught while polling {} from kafka topic {} at offset {}: {}", + e.getClass().getName(), threadId, topicName, lastProcessedOffset, e.getMessage()); } + + handleAccordingToStrategy(partitionLastOffset, e); } finally { // only close if not retry - if (!retry.get()) { + if (!isRetrying()) { LOG.debug("Closing consumer {}", threadId); IOHelper.close(consumer); } } } + private void handleAccordingToStrategy(long partitionLastOffset, Exception e) { + PollOnError onError = pollExceptionStrategy.handleException(e); + if (PollOnError.RETRY == onError) { + handlePollRetry(); + } else if (PollOnError.RECONNECT == onError) { + handlePollReconnect(); + } else if (PollOnError.ERROR_HANDLER == onError) { + handlePollErrorHandler(partitionLastOffset, e); + } else if (PollOnError.DISCARD == onError) { + handlePollDiscard(partitionLastOffset); + } else if (PollOnError.STOP == onError) { + handlePollStop(); + } + } + + private void safeUnsubscribe() { + try { + consumer.unsubscribe(); + } catch (Exception e) { + kafkaConsumer.getExceptionHandler().handleException( + "Error unsubscribing " + threadId + " from kafka topic " + topicName, + e); + } + } + + private void commit() { + if (isAutoCommitEnabled()) { + if ("async".equals(kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitOnStop())) { + LOG.info("Auto commitAsync on stop {} from topic {}", threadId, topicName); + consumer.commitAsync(); + } else if ("sync".equals(kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitOnStop())) { + LOG.info("Auto commitSync on stop {} from topic {}", threadId, topicName); + consumer.commitSync(); + } else if ("none".equals(kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitOnStop())) { + LOG.info("Auto commit on stop {} from topic {} is disabled (none)", threadId, topicName); + } + } + } + + private void handlePollStop() { + // stop and terminate consumer + LOG.warn("Requesting the consumer to stop based on polling exception strategy"); + + setRetry(false); + setReconnect(false); + } + + private void handlePollDiscard(long partitionLastOffset) { + LOG.warn("Requesting the consumer to discard the message and continue to the next based on polling exception strategy"); + + // skip this poison message and seek to next message + seekToNextOffset(partitionLastOffset); + } + + private void handlePollErrorHandler(long partitionLastOffset, Exception e) { + LOG.warn("Deferring processing to the exception handler based on polling exception strategy"); + + // use bridge error handler to route with exception + bridge.handleException(e); + // skip this poison message and seek to next message + seekToNextOffset(partitionLastOffset); + } + + private void handlePollReconnect() { + LOG.warn("Requesting the consumer to re-connect on the next run based on polling exception strategy"); + + // re-connect so the consumer can try the same message again + setReconnect(true); + + // to close the current consumer + setRetry(false); + } + + private void handlePollRetry() { + LOG.warn("Requesting the consumer to retry polling the same message based on polling exception strategy"); + + // consumer retry the same message again + setRetry(true); + } + + private boolean isKafkaConsumerRunnable() { + return kafkaConsumer.isRunAllowed() && !kafkaConsumer.isStoppingOrStopped() + && !kafkaConsumer.isSuspendingOrSuspended(); + } + + private boolean isRunnable() { + return kafkaConsumer.getEndpoint().getCamelContext().isStopping() && !kafkaConsumer.isRunAllowed(); + } + + private long processPolledRecords(ConsumerRecords<Object, Object> allRecords) { + long partitionLastOffset = -1; + + LOG.debug("Record count {} to process", allRecords.count()); Review comment: You can add isDebugEnabled to avoid the count() call ########## File path: components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java ########## @@ -71,361 +75,322 @@ this.kafkaProps = kafkaProps; } + void preInit() { + createConsumer(); + + StateRepository<String, String> offsetRepository = kafkaConsumer.getEndpoint().getConfiguration().getOffsetRepository(); + String seekPolicy = kafkaConsumer.getEndpoint().getConfiguration().getSeekTo(); + resumeStrategy = ResumeStrategyFactory.newResumeStrategy(consumer, offsetRepository, seekPolicy); + } + @Override public void run() { - boolean first = true; - final AtomicBoolean reTry = new AtomicBoolean(true); - final AtomicBoolean reConnect = new AtomicBoolean(true); + if (!isKafkaConsumerRunnable()) { + return; + } - while (reTry.get() || reConnect.get()) { + if (isRetrying() || isReconnecting()) { try { - if (first || reConnect.get()) { + if (isReconnecting()) { // re-initialize on re-connect so we have a fresh consumer - doInit(); + createConsumer(); } } catch (Exception e) { // ensure this is logged so users can see the problem LOG.warn("Error creating org.apache.kafka.clients.consumer.KafkaConsumer due {}", e.getMessage(), e); } - if (!first) { - // skip one poll timeout before trying again - long delay = kafkaConsumer.getEndpoint().getConfiguration().getPollTimeoutMs(); - String prefix = reConnect.get() ? "Reconnecting" : "Retrying"; - LOG.info("{} {} to topic {} after {} ms", prefix, threadId, topicName, delay); - try { - Thread.sleep(delay); - } catch (InterruptedException e) { - boolean stopping = kafkaConsumer.getEndpoint().getCamelContext().isStopping(); - if (stopping) { - LOG.info( - "CamelContext is stopping so terminating KafkaConsumer thread: {} receiving from topic: {}", - threadId, topicName); - return; - } - } - } - - first = false; - - if (isCloseable()) { - LOG.debug("Closing consumer {}", threadId); - IOHelper.close(consumer); - return; - } + long delay = kafkaConsumer.getEndpoint().getConfiguration().getPollTimeoutMs(); + String prefix = isReconnecting() ? "Reconnecting" : "Retrying"; + LOG.info("{} {} to topic {} after {} ms", prefix, threadId, topicName, delay); - // doRun keeps running until we either shutdown or is told to re-connect - doRun(reTry, reConnect); + doRun(); } LOG.info("Terminating KafkaConsumer thread: {} receiving from topic: {}", threadId, topicName); + safeUnsubscribe(); + IOHelper.close(consumer); } - private boolean isCloseable() { - return !kafkaConsumer.isRunAllowed() || kafkaConsumer.isStoppingOrStopped() || kafkaConsumer.isSuspendingOrSuspended(); - } - - void preInit() { - doInit(); - } - - protected void doInit() { + protected void createConsumer() { // create consumer ClassLoader threadClassLoader = Thread.currentThread().getContextClassLoader(); try { - // Kafka uses reflection for loading authentication settings, - // use its classloader + // Kafka uses reflection for loading authentication settings, use its classloader Thread.currentThread() .setContextClassLoader(org.apache.kafka.clients.consumer.KafkaConsumer.class.getClassLoader()); - // this may throw an exception if something is wrong with kafka - // consumer + + // this may throw an exception if something is wrong with kafka consumer this.consumer = kafkaConsumer.getEndpoint().getKafkaClientFactory().getConsumer(kafkaProps); } finally { Thread.currentThread().setContextClassLoader(threadClassLoader); } } - @SuppressWarnings("unchecked") - protected void doRun(AtomicBoolean retry, AtomicBoolean reconnect) { - if (reconnect.get()) { + protected void doRun() { + if (isReconnecting()) { + subscribe(); + // on first run or reconnecting - doReconnectRun(); - // set reconnect to false as its done now - reconnect.set(false); - // set retry to true to continue polling - retry.set(true); - } - // polling - doPollRun(retry, reconnect); - } + resume(); - protected void doReconnectRun() { - if (topicPattern != null) { - LOG.info("Subscribing {} to topic pattern {}", threadId, topicName); - consumer.subscribe(topicPattern, this); - } else { - LOG.info("Subscribing {} to topic {}", threadId, topicName); - consumer.subscribe(Arrays.asList(topicName.split(",")), this); - } + // set reconnect to false as the connection and resume is done at this point + setReconnect(false); - StateRepository<String, String> offsetRepository = kafkaConsumer.getEndpoint().getConfiguration().getOffsetRepository(); - if (offsetRepository != null) { - resumeFromOffsetRepository(offsetRepository); - } else if (kafkaConsumer.getEndpoint().getConfiguration().getSeekTo() != null) { - resumeFromSeekPolicy(); + // set retry to true to continue polling + setRetry(true); } - } - private void resumeFromOffsetRepository(StateRepository<String, String> offsetRepository) { - for (TopicPartition topicPartition : (Set<TopicPartition>) consumer.assignment()) { - String offsetState = offsetRepository.getState(serializeOffsetKey(topicPartition)); - if (offsetState != null && !offsetState.isEmpty()) { - resumeFromOffset(topicPartition, offsetState); - } - } + // start polling + startPolling(); } - private void resumeFromOffset(TopicPartition topicPartition, String offsetState) { - // The state contains the last read offset, so you need to seek from the next one - long offset = deserializeOffsetValue(offsetState) + 1; - LOG.debug("Resuming partition {} from offset {} from state", topicPartition.partition(), offset); - consumer.seek(topicPartition, offset); + protected void resume() { + resumeStrategy.resume(); } - private void resumeFromSeekPolicy() { - if (kafkaConsumer.getEndpoint().getConfiguration().getSeekTo().equals("beginning")) { - LOG.debug("{} is seeking to the beginning on topic {}", threadId, topicName); - // This poll to ensure we have an assigned partition - // otherwise seek won't work - consumer.poll(Duration.ofMillis(100)); - consumer.seekToBeginning(consumer.assignment()); - } else if (kafkaConsumer.getEndpoint().getConfiguration().getSeekTo().equals("end")) { - LOG.debug("{} is seeking to the end on topic {}", threadId, topicName); - // This poll to ensures we have an assigned partition - // otherwise seek won't work - consumer.poll(Duration.ofMillis(100)); - consumer.seekToEnd(consumer.assignment()); + private void subscribe() { + PartitionAssignmentListener listener = new PartitionAssignmentListener( + threadId, topicName, + kafkaConsumer.getEndpoint().getConfiguration(), consumer, lastProcessedOffset, this::isRunnable); + + if (topicPattern != null) { + LOG.info("Subscribing {} to topic pattern {}", threadId, topicName); + consumer.subscribe(topicPattern, listener); + } else { + LOG.info("Subscribing {} to topic {}", threadId, topicName); + consumer.subscribe(Arrays.asList(topicName.split(",")), listener); } } - protected void doPollRun(AtomicBoolean retry, AtomicBoolean reconnect) { - StateRepository<String, String> offsetRepository = kafkaConsumer.getEndpoint().getConfiguration().getOffsetRepository(); - - // allow to re-connect thread in case we use that to retry failed messages - boolean unsubscribing = false; - - TopicPartition partition = null; + protected void startPolling() { long partitionLastOffset = -1; try { - while (kafkaConsumer.isRunAllowed() && !kafkaConsumer.isStoppingOrStopped() - && !kafkaConsumer.isSuspendingOrSuspended() - && retry.get() && !reconnect.get()) { - - // flag to break out processing on the first exception - boolean breakOnErrorHit = false; + while (isKafkaConsumerRunnable() && isRetrying() && !isReconnecting()) { long pollTimeoutMs = kafkaConsumer.getEndpoint().getConfiguration().getPollTimeoutMs(); LOG.trace("Polling {} from topic: {} with timeout: {}", threadId, topicName, pollTimeoutMs); ConsumerRecords<Object, Object> allRecords = consumer.poll(Duration.ofMillis(pollTimeoutMs)); - - Iterator<TopicPartition> partitionIterator = allRecords.partitions().iterator(); - while (partitionIterator.hasNext()) { - partition = partitionIterator.next(); - partitionLastOffset = -1; - - Iterator<ConsumerRecord<Object, Object>> recordIterator = allRecords.records(partition).iterator(); - LOG.debug("Records count {} received for partition {}", allRecords.records(partition).size(), - partition); - if (!breakOnErrorHit && recordIterator.hasNext()) { - - while (!breakOnErrorHit && recordIterator.hasNext()) { - ConsumerRecord<Object, Object> record = recordIterator.next(); - if (LOG.isTraceEnabled()) { - LOG.trace("Partition = {}, offset = {}, key = {}, value = {}", record.partition(), - record.offset(), record.key(), record.value()); - } - Exchange exchange = createKafkaExchange(record); - - propagateHeaders(record, exchange, kafkaConsumer.getEndpoint().getConfiguration()); - - // if not auto commit then we have additional - // information on the exchange - if (!isAutoCommitEnabled()) { - exchange.getIn().setHeader(KafkaConstants.LAST_RECORD_BEFORE_COMMIT, - !recordIterator.hasNext()); - } - if (kafkaConsumer.getEndpoint().getConfiguration().isAllowManualCommit()) { - // allow Camel users to access the Kafka - // consumer API to be able to do for example - // manual commits - KafkaManualCommit manual - = kafkaConsumer.getEndpoint().getComponent().getKafkaManualCommitFactory() - .newInstance(exchange, consumer, topicName, threadId, - offsetRepository, partition, record.offset()); - exchange.getIn().setHeader(KafkaConstants.MANUAL_COMMIT, manual); - } - // if commit management is on user side give additional info for the end of poll loop - if (!isAutoCommitEnabled() - || kafkaConsumer.getEndpoint().getConfiguration().isAllowManualCommit()) { - exchange.getIn().setHeader(KafkaConstants.LAST_POLL_RECORD, - !recordIterator.hasNext() && !partitionIterator.hasNext()); - } - - try { - kafkaConsumer.getProcessor().process(exchange); - } catch (Exception e) { - exchange.setException(e); - } - - if (exchange.getException() != null) { - // processing failed due to an unhandled - // exception, what should we do - if (kafkaConsumer.getEndpoint().getConfiguration().isBreakOnFirstError()) { - // we are failing and we should break out - LOG.warn( - "Error during processing {} from topic: {}. Will seek consumer to offset: {} and re-connect and start polling again.", - exchange, topicName, partitionLastOffset, exchange.getException()); - // force commit so we resume on next poll where we failed - commitOffset(offsetRepository, partition, partitionLastOffset, false, true); - // continue to next partition - breakOnErrorHit = true; - } else { - // will handle/log the exception and - // then continue to next - kafkaConsumer.getExceptionHandler().handleException("Error during processing", exchange, - exchange.getException()); - } - } else { - // record was success so remember its offset - partitionLastOffset = record.offset(); - // lastOffsetProcessed would be used by - // Consumer re-balance listener to preserve - // offset state upon partition revoke - lastProcessedOffset.put(serializeOffsetKey(partition), partitionLastOffset); - } - - // success so release the exchange - kafkaConsumer.releaseExchange(exchange, false); - } - - if (!breakOnErrorHit) { - // all records processed from partition so commit them - commitOffset(offsetRepository, partition, partitionLastOffset, false, false); - } - } + if (allRecords.isEmpty()) { + LOG.debug("No records received when polling ... (continuing)"); } - if (breakOnErrorHit) { - // force re-connect - reconnect.set(true); - retry.set(false); // to close the current consumer - } + partitionLastOffset = processPolledRecords(allRecords); } - if (!reconnect.get()) { - if (isAutoCommitEnabled()) { - if ("async".equals(kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitOnStop())) { - LOG.info("Auto commitAsync on stop {} from topic {}", threadId, topicName); - consumer.commitAsync(); - } else if ("sync".equals(kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitOnStop())) { - LOG.info("Auto commitSync on stop {} from topic {}", threadId, topicName); - consumer.commitSync(); - } else if ("none".equals(kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitOnStop())) { - LOG.info("Auto commit on stop {} from topic {} is disabled (none)", threadId, topicName); - } - } + if (!isReconnecting()) { + LOG.debug("Not reconnecting, check whether to auto-commit or not ..."); + commit(); } - LOG.info("Unsubscribing {} from topic {}", threadId, topicName); - // we are unsubscribing so do not re connect - unsubscribing = true; - consumer.unsubscribe(); + safeUnsubscribe(); } catch (InterruptException e) { kafkaConsumer.getExceptionHandler().handleException("Interrupted while consuming " + threadId + " from kafka topic", e); + commit(); + LOG.info("Unsubscribing {} from topic {}", threadId, topicName); - consumer.unsubscribe(); + safeUnsubscribe(); Thread.currentThread().interrupt(); + } catch (WakeupException e) { + // This is normal: it raises this exception when calling the wakeUp (which happens when we stop) + LOG.info("The kafka consumer was woken up while polling"); + safeUnsubscribe(); } catch (Exception e) { if (LOG.isDebugEnabled()) { - LOG.debug("Exception caught while polling " + threadId + " from kafka topic " + topicName - + " at offset " + lastProcessedOffset + ". Deciding what to do.", - e); - } - if (unsubscribing) { - // some kind of error in kafka, it may happen during unsubscribing - kafkaConsumer.getExceptionHandler().handleException( - "Error unsubscribing " + threadId + " from kafka topic " + topicName, - e); + LOG.warn("Exception {} caught while polling {} from kafka topic {} at offset {}: {}", + e.getClass().getName(), threadId, topicName, lastProcessedOffset, e.getMessage(), e); } else { - PollOnError onError = pollExceptionStrategy.handleException(e); - if (PollOnError.RETRY == onError) { - LOG.warn( - "{} consuming {} from topic {} causedby {}. Will attempt again polling the same message (stacktrace in DEBUG logging level)", - e.getClass().getName(), threadId, topicName, e.getMessage()); - if (LOG.isDebugEnabled()) { - LOG.debug( - "KafkaException consuming {} from topic {} causedby {}. Will attempt again polling the same message", - threadId, topicName, e.getMessage(), e); - } - // consumer retry the same message again - retry.set(true); - } else if (PollOnError.RECONNECT == onError) { - LOG.warn( - "{} consuming {} from topic {} causedby {}. Will attempt to re-connect on next run (stacktrace in DEBUG logging level)", - e.getClass().getName(), threadId, topicName, e.getMessage()); - if (LOG.isDebugEnabled()) { - LOG.debug( - "{} consuming {} from topic {} causedby {}. Will attempt to re-connect on next run", - e.getClass().getName(), threadId, topicName, e.getMessage(), e); - } - // re-connect so the consumer can try the same message again - reconnect.set(true); - retry.set(false); // to close the current consumer - } else if (PollOnError.ERROR_HANDLER == onError) { - // use bridge error handler to route with exception - bridge.handleException(e); - // skip this poison message and seek to next message - seekToNextOffset(partitionLastOffset); - } else if (PollOnError.DISCARD == onError) { - // discard message - LOG.warn( - "{} consuming {} from topic {} causedby {}. Will discard the message and continue to poll the next message (stracktrace in DEBUG logging level).", - e.getClass().getName(), threadId, topicName, e.getMessage()); - if (LOG.isDebugEnabled()) { - LOG.debug( - "{} consuming {} from topic {} causedby {}. Will discard the message and continue to poll the next message.", - e.getClass().getName(), threadId, topicName, e.getMessage(), e); - } - // skip this poison message and seek to next message - seekToNextOffset(partitionLastOffset); - } else if (PollOnError.STOP == onError) { - // stop and terminate consumer - LOG.warn( - "{} consuming {} from topic {} causedby {}. Will stop consumer (stacktrace in DEBUG logging level).", - e.getClass().getName(), threadId, topicName, e.getMessage()); - if (LOG.isDebugEnabled()) { - LOG.debug( - "{} consuming {} from topic {} causedby {}. Will stop consumer.", - e.getClass().getName(), threadId, topicName, e.getMessage(), e); - } - retry.set(false); - reconnect.set(false); - } + LOG.warn("Exception {} caught while polling {} from kafka topic {} at offset {}: {}", + e.getClass().getName(), threadId, topicName, lastProcessedOffset, e.getMessage()); } + + handleAccordingToStrategy(partitionLastOffset, e); } finally { // only close if not retry - if (!retry.get()) { + if (!isRetrying()) { LOG.debug("Closing consumer {}", threadId); IOHelper.close(consumer); } } } + private void handleAccordingToStrategy(long partitionLastOffset, Exception e) { + PollOnError onError = pollExceptionStrategy.handleException(e); + if (PollOnError.RETRY == onError) { + handlePollRetry(); + } else if (PollOnError.RECONNECT == onError) { + handlePollReconnect(); + } else if (PollOnError.ERROR_HANDLER == onError) { + handlePollErrorHandler(partitionLastOffset, e); + } else if (PollOnError.DISCARD == onError) { + handlePollDiscard(partitionLastOffset); + } else if (PollOnError.STOP == onError) { + handlePollStop(); + } + } + + private void safeUnsubscribe() { + try { + consumer.unsubscribe(); + } catch (Exception e) { + kafkaConsumer.getExceptionHandler().handleException( + "Error unsubscribing " + threadId + " from kafka topic " + topicName, + e); + } + } + + private void commit() { + if (isAutoCommitEnabled()) { + if ("async".equals(kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitOnStop())) { + LOG.info("Auto commitAsync on stop {} from topic {}", threadId, topicName); + consumer.commitAsync(); + } else if ("sync".equals(kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitOnStop())) { + LOG.info("Auto commitSync on stop {} from topic {}", threadId, topicName); + consumer.commitSync(); + } else if ("none".equals(kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitOnStop())) { + LOG.info("Auto commit on stop {} from topic {} is disabled (none)", threadId, topicName); + } + } + } + + private void handlePollStop() { + // stop and terminate consumer + LOG.warn("Requesting the consumer to stop based on polling exception strategy"); + + setRetry(false); + setReconnect(false); + } + + private void handlePollDiscard(long partitionLastOffset) { + LOG.warn("Requesting the consumer to discard the message and continue to the next based on polling exception strategy"); + + // skip this poison message and seek to next message + seekToNextOffset(partitionLastOffset); + } + + private void handlePollErrorHandler(long partitionLastOffset, Exception e) { + LOG.warn("Deferring processing to the exception handler based on polling exception strategy"); + + // use bridge error handler to route with exception + bridge.handleException(e); + // skip this poison message and seek to next message + seekToNextOffset(partitionLastOffset); + } + + private void handlePollReconnect() { + LOG.warn("Requesting the consumer to re-connect on the next run based on polling exception strategy"); + + // re-connect so the consumer can try the same message again + setReconnect(true); + + // to close the current consumer + setRetry(false); + } + + private void handlePollRetry() { + LOG.warn("Requesting the consumer to retry polling the same message based on polling exception strategy"); + + // consumer retry the same message again + setRetry(true); + } + + private boolean isKafkaConsumerRunnable() { + return kafkaConsumer.isRunAllowed() && !kafkaConsumer.isStoppingOrStopped() + && !kafkaConsumer.isSuspendingOrSuspended(); + } + + private boolean isRunnable() { + return kafkaConsumer.getEndpoint().getCamelContext().isStopping() && !kafkaConsumer.isRunAllowed(); + } + + private long processPolledRecords(ConsumerRecords<Object, Object> allRecords) { + long partitionLastOffset = -1; + + LOG.debug("Record count {} to process", allRecords.count()); + + // flag to break out processing on the first exception + boolean breakOnErrorHit = false; + Iterator<TopicPartition> partitionIterator = allRecords.partitions().iterator(); + + while (partitionIterator.hasNext()) { + partitionLastOffset = -1; + + TopicPartition partition = partitionIterator.next(); + + Iterator<ConsumerRecord<Object, Object>> recordIterator = allRecords.records(partition).iterator(); + LOG.debug("Records count {} received for partition {}", allRecords.records(partition).size(), Review comment: Same here about ifDebugEnabled -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
