This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 40f51d1 (chores) camel-kafka: cleanups related to CAMEL-16949
40f51d1 is described below
commit 40f51d1cb83253407b859071085eed35464de1f1
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Mon Sep 20 14:41:54 2021 +0200
(chores) camel-kafka: cleanups related to CAMEL-16949
- avoid rebuilding the processor for every polled record
- make the exception processor method private
- remove unused variables
- make the reconnect variable not volatile as it is unnecessary and
local to the thread
- make the retry variable not volatile as it is unnecessary
- avoid recreating the poll duration variable
- stop creating the lock in the inner loop and move it to the main loop
- remove unnecessary parameter thread ID in some methods as it is already
available as a member variable to the record processors
- stop overwriting the local lastResult parameter
- fix an incorrect point of commit
---
.../camel/component/kafka/KafkaFetchRecords.java | 67 +++++++++++-----------
.../consumer/support/KafkaRecordProcessor.java | 6 +-
2 files changed, 35 insertions(+), 38 deletions(-)
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index 720ee8d..e96dc6a 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -32,7 +32,6 @@ import java.util.regex.Pattern;
import org.apache.camel.Exchange;
import org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor;
import
org.apache.camel.component.kafka.consumer.support.PartitionAssignmentListener;
-import org.apache.camel.component.kafka.consumer.support.ResumeStrategy;
import org.apache.camel.support.BridgeExceptionHandlerToErrorHandler;
import org.apache.camel.util.IOHelper;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -60,9 +59,8 @@ class KafkaFetchRecords implements Runnable {
private final ReentrantLock lock = new ReentrantLock();
private final AtomicBoolean stopping = new AtomicBoolean(false);
- private volatile boolean retry = true;
- private volatile boolean reconnect = true;
- private ResumeStrategy resumeStrategy;
+ private boolean retry = true;
+ private boolean reconnect = true;
KafkaFetchRecords(KafkaConsumer kafkaConsumer, PollExceptionStrategy
pollExceptionStrategy,
BridgeExceptionHandlerToErrorHandler bridge, String
topicName, Pattern topicPattern, String id,
@@ -157,13 +155,22 @@ class KafkaFetchRecords implements Runnable {
long partitionLastOffset = -1;
try {
+ /*
+ * We lock the processing of the record to avoid raising a
WakeUpException as a result to a call
+ * to stop() or shutdown().
+ */
+ lock.lock();
+
long pollTimeoutMs =
kafkaConsumer.getEndpoint().getConfiguration().getPollTimeoutMs();
LOG.trace("Polling {} from topic: {} with timeout: {}", threadId,
topicName, pollTimeoutMs);
+ KafkaRecordProcessor kafkaRecordProcessor =
buildKafkaRecordProcessor();
+
+ Duration pollDuration = Duration.ofMillis(pollTimeoutMs);
while (isKafkaConsumerRunnable() && isRetrying() &&
!isReconnecting()) {
- ConsumerRecords<Object, Object> allRecords =
consumer.poll(Duration.ofMillis(pollTimeoutMs));
+ ConsumerRecords<Object, Object> allRecords =
consumer.poll(pollDuration);
- partitionLastOffset = processPolledRecords(allRecords);
+ partitionLastOffset = processPolledRecords(allRecords,
kafkaRecordProcessor);
}
if (!isReconnecting()) {
@@ -195,6 +202,8 @@ class KafkaFetchRecords implements Runnable {
handleAccordingToStrategy(partitionLastOffset, e);
} finally {
+ lock.unlock();
+
// only close if not retry
if (!isRetrying()) {
LOG.debug("Closing consumer {}", threadId);
@@ -292,7 +301,7 @@ class KafkaFetchRecords implements Runnable {
return kafkaConsumer.getEndpoint().getCamelContext().isStopping() &&
!kafkaConsumer.isRunAllowed();
}
- private long processPolledRecords(ConsumerRecords<Object, Object>
allRecords) {
+ private long processPolledRecords(ConsumerRecords<Object, Object>
allRecords, KafkaRecordProcessor kafkaRecordProcessor) {
logRecords(allRecords);
Set<TopicPartition> partitions = allRecords.partitions();
@@ -309,24 +318,17 @@ class KafkaFetchRecords implements Runnable {
logRecordsInPartition(partitionRecords, partition);
- KafkaRecordProcessor kafkaRecordProcessor =
buildKafkaRecordProcessor();
-
- try {
- /*
- * We lock the processing of the record to avoid raising a
WakeUpException as a result to a call
- * to stop() or shutdown().
- */
- lock.lock();
-
- while (!lastResult.isBreakOnErrorHit() &&
recordIterator.hasNext() && !isStopping()) {
- ConsumerRecord<Object, Object> record =
recordIterator.next();
+ while (!lastResult.isBreakOnErrorHit() && recordIterator.hasNext()
&& !isStopping()) {
+ ConsumerRecord<Object, Object> record = recordIterator.next();
- lastResult = processRecord(partition,
partitionIterator.hasNext(), recordIterator.hasNext(), lastResult,
- kafkaRecordProcessor, record);
+ lastResult = processRecord(partition,
partitionIterator.hasNext(), recordIterator.hasNext(), lastResult,
+ kafkaRecordProcessor, record);
+ }
- }
- } finally {
- lock.unlock();
+ if (!lastResult.isBreakOnErrorHit()) {
+ LOG.debug("Committing offset on successful execution");
+ // all records processed from partition so commit them
+ kafkaRecordProcessor.commitOffset(partition,
lastResult.getPartitionLastOffset(), false, false);
}
}
@@ -357,7 +359,7 @@ class KafkaFetchRecords implements Runnable {
TopicPartition partition,
boolean partitionHasNext,
boolean recordHasNext,
- KafkaRecordProcessor.ProcessResult lastResult,
+ final KafkaRecordProcessor.ProcessResult lastResult,
KafkaRecordProcessor kafkaRecordProcessor,
ConsumerRecord<Object, Object> record) {
@@ -365,23 +367,18 @@ class KafkaFetchRecords implements Runnable {
Exchange exchange = kafkaConsumer.createExchange(false);
- lastResult = kafkaRecordProcessor.processExchange(exchange, partition,
partitionHasNext,
- recordHasNext, record, lastResult,
kafkaConsumer.getExceptionHandler());
+ KafkaRecordProcessor.ProcessResult currentResult
+ = kafkaRecordProcessor.processExchange(exchange, partition,
partitionHasNext,
+ recordHasNext, record, lastResult,
kafkaConsumer.getExceptionHandler());
- if (!lastResult.isBreakOnErrorHit()) {
- lastProcessedOffset.put(serializeOffsetKey(partition),
lastResult.getPartitionLastOffset());
+ if (!currentResult.isBreakOnErrorHit()) {
+ lastProcessedOffset.put(serializeOffsetKey(partition),
currentResult.getPartitionLastOffset());
}
// success so release the exchange
kafkaConsumer.releaseExchange(exchange, false);
- if (!lastResult.isBreakOnErrorHit()) {
- LOG.debug("Committing offset on successful execution");
- // all records processed from partition so commit them
- kafkaRecordProcessor.commitOffset(partition,
lastResult.getPartitionLastOffset(), false, false,
- threadId);
- }
- return lastResult;
+ return currentResult;
}
private void logRecord(ConsumerRecord<Object, Object> record) {
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
index f1a70b0..734861e 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
@@ -159,7 +159,7 @@ public class KafkaRecordProcessor {
return new ProcessResult(false, record.offset());
}
- public boolean processException(
+ private boolean processException(
Exchange exchange, TopicPartition partition, long
partitionLastOffset,
ExceptionHandler exceptionHandler) {
@@ -170,7 +170,7 @@ public class KafkaRecordProcessor {
LOG.warn("Will seek consumer to offset {} and start polling
again.", partitionLastOffset);
// force commit, so we resume on next poll where we failed
- commitOffset(partition, partitionLastOffset, false, true,
threadId);
+ commitOffset(partition, partitionLastOffset, false, true);
// continue to next partition
return true;
@@ -183,7 +183,7 @@ public class KafkaRecordProcessor {
}
public void commitOffset(
- TopicPartition partition, long partitionLastOffset, boolean
stopping, boolean forceCommit, String threadId) {
+ TopicPartition partition, long partitionLastOffset, boolean
stopping, boolean forceCommit) {
commitOffset(configuration, consumer, partition, partitionLastOffset,
stopping, forceCommit, threadId);
}