This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-4.8.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-4.8.x by this push:
new 5dd3c6eb2c7 CAMEL-22004: camel-kafka - Pause action must be re-issued
when new partitions are assigned to preserve the paused state of the Consumer
assignment (#17831)
5dd3c6eb2c7 is described below
commit 5dd3c6eb2c742dad273a44d0e05a2f334e89b713
Author: Pavel Bořík <[email protected]>
AuthorDate: Tue Apr 22 16:43:51 2025 +0200
CAMEL-22004: camel-kafka - Pause action must be re-issued when new
partitions are assigned to preserve the paused state of the Consumer assignment
(#17831)
---
.../camel/component/kafka/KafkaFetchRecords.java | 48 +++++++++++++++++-----
1 file changed, 38 insertions(+), 10 deletions(-)
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index 3534fccf3f4..a3b843b5721 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -17,8 +17,10 @@
package org.apache.camel.component.kafka;
import java.time.Duration;
+import java.util.Collection;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
@@ -96,7 +98,7 @@ public class KafkaFetchRecords implements Runnable {
private volatile long currentBackoffInterval;
private volatile boolean reconnect; // The reconnect must be false at init
(this is the policy whether to reconnect).
private volatile boolean connected; // this is the state (connected or not)
- private volatile State state = State.RUNNING;
+ private final AtomicReference<State> state = new
AtomicReference<>(State.RUNNING);
private final DevConsoleMetricsCollector metricsCollector;
@@ -325,7 +327,7 @@ public class KafkaFetchRecords implements Runnable {
LOG.info("Searching for a custom subscribe adapter on the registry");
final SubscribeAdapter adapter = resolveSubscribeAdapter(camelContext);
- adapter.subscribe(consumer, listener, topicInfo);
+ adapter.subscribe(consumer, new
PausePreservingRebalanceListener(listener), topicInfo);
}
private static SubscribeAdapter resolveSubscribeAdapter(CamelContext
camelContext) {
@@ -375,7 +377,7 @@ public class KafkaFetchRecords implements Runnable {
updateTaskState();
// when breakOnFirstError we want to unsubscribe from Kafka
- if (result != null && result.isBreakOnErrorHit() &&
!this.state.equals(State.PAUSED)) {
+ if (result != null && result.isBreakOnErrorHit() &&
!this.state.get().equals(State.PAUSED)) {
LOG.debug("We hit an error ... setting flags to force
reconnect");
// force re-connect
setReconnect(true);
@@ -439,11 +441,11 @@ public class KafkaFetchRecords implements Runnable {
}
private void updateTaskState() {
- switch (state) {
+ switch (state.get()) {
case PAUSE_REQUESTED:
LOG.info("Pausing the consumer as a response to a pause
request");
consumer.pause(consumer.assignment());
- state = State.PAUSED;
+ state.set(State.PAUSED);
break;
case RESUME_REQUESTED:
LOG.info("Resuming the consumer as a response to a resume
request");
@@ -458,7 +460,7 @@ public class KafkaFetchRecords implements Runnable {
});
}
consumer.resume(consumer.assignment());
- state = State.RUNNING;
+ state.set(State.RUNNING);
break;
default:
break;
@@ -572,7 +574,7 @@ public class KafkaFetchRecords implements Runnable {
public boolean isPaused() {
// cannot use consumer directly as you can have
ConcurrentModificationException as kafka client does not permit
// multiple threads to use the client consumer, so we check the state
only
- return state == State.PAUSED;
+ return state.get() == State.PAUSED;
}
public void setConnected(boolean connected) {
@@ -639,7 +641,7 @@ public class KafkaFetchRecords implements Runnable {
*/
public void pause() {
LOG.info("A pause request was issued and the consumer thread will
pause after current processing has finished");
- state = State.PAUSE_REQUESTED;
+ state.set(State.PAUSE_REQUESTED);
}
/*
@@ -648,7 +650,7 @@ public class KafkaFetchRecords implements Runnable {
*/
public void resume() {
LOG.info("A resume request was issued and the consumer thread will
resume after current processing has finished");
- state = State.RESUME_REQUESTED;
+ state.set(State.RESUME_REQUESTED);
}
private synchronized void setLastError(Exception lastError) {
@@ -664,6 +666,32 @@ public class KafkaFetchRecords implements Runnable {
}
String getState() {
- return state.name();
+ return state.get().name();
+ }
+
+ private class PausePreservingRebalanceListener implements
ConsumerRebalanceListener {
+ private final ConsumerRebalanceListener delegate;
+
+ PausePreservingRebalanceListener(ConsumerRebalanceListener delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition> partitions)
{
+ delegate.onPartitionsRevoked(partitions);
+ }
+
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition>
partitions) {
+ if (state.compareAndSet(State.PAUSED, State.PAUSE_REQUESTED)) {
+ LOG.debug("Partitions were assigned while paused, the consumer
will be re-paused");
+ }
+ delegate.onPartitionsAssigned(partitions);
+ }
+
+ @Override
+ public void onPartitionsLost(Collection<TopicPartition> partitions) {
+ delegate.onPartitionsLost(partitions);
+ }
}
}