This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-4.8.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-4.8.x by this push:
     new 5dd3c6eb2c7 CAMEL-22004: camel-kafka - Pause action must be re-issued 
when new partitions are assigned to preserve the paused state of the Consumer 
assignment (#17831)
5dd3c6eb2c7 is described below

commit 5dd3c6eb2c742dad273a44d0e05a2f334e89b713
Author: Pavel Bořík <[email protected]>
AuthorDate: Tue Apr 22 16:43:51 2025 +0200

    CAMEL-22004: camel-kafka - Pause action must be re-issued when new 
partitions are assigned to preserve the paused state of the Consumer assignment 
(#17831)
---
 .../camel/component/kafka/KafkaFetchRecords.java   | 48 +++++++++++++++++-----
 1 file changed, 38 insertions(+), 10 deletions(-)

diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index 3534fccf3f4..a3b843b5721 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -17,8 +17,10 @@
 package org.apache.camel.component.kafka;
 
 import java.time.Duration;
+import java.util.Collection;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.regex.Pattern;
 
@@ -96,7 +98,7 @@ public class KafkaFetchRecords implements Runnable {
     private volatile long currentBackoffInterval;
     private volatile boolean reconnect; // The reconnect must be false at init 
(this is the policy whether to reconnect).
     private volatile boolean connected; // this is the state (connected or not)
-    private volatile State state = State.RUNNING;
+    private final AtomicReference<State> state = new 
AtomicReference<>(State.RUNNING);
 
     private final DevConsoleMetricsCollector metricsCollector;
 
@@ -325,7 +327,7 @@ public class KafkaFetchRecords implements Runnable {
         LOG.info("Searching for a custom subscribe adapter on the registry");
         final SubscribeAdapter adapter = resolveSubscribeAdapter(camelContext);
 
-        adapter.subscribe(consumer, listener, topicInfo);
+        adapter.subscribe(consumer, new 
PausePreservingRebalanceListener(listener), topicInfo);
     }
 
     private static SubscribeAdapter resolveSubscribeAdapter(CamelContext 
camelContext) {
@@ -375,7 +377,7 @@ public class KafkaFetchRecords implements Runnable {
                 updateTaskState();
 
                 // when breakOnFirstError we want to unsubscribe from Kafka
-                if (result != null && result.isBreakOnErrorHit() && 
!this.state.equals(State.PAUSED)) {
+                if (result != null && result.isBreakOnErrorHit() && 
!this.state.get().equals(State.PAUSED)) {
                     LOG.debug("We hit an error ... setting flags to force 
reconnect");
                     // force re-connect
                     setReconnect(true);
@@ -439,11 +441,11 @@ public class KafkaFetchRecords implements Runnable {
     }
 
     private void updateTaskState() {
-        switch (state) {
+        switch (state.get()) {
             case PAUSE_REQUESTED:
                 LOG.info("Pausing the consumer as a response to a pause 
request");
                 consumer.pause(consumer.assignment());
-                state = State.PAUSED;
+                state.set(State.PAUSED);
                 break;
             case RESUME_REQUESTED:
                 LOG.info("Resuming the consumer as a response to a resume 
request");
@@ -458,7 +460,7 @@ public class KafkaFetchRecords implements Runnable {
                     });
                 }
                 consumer.resume(consumer.assignment());
-                state = State.RUNNING;
+                state.set(State.RUNNING);
                 break;
             default:
                 break;
@@ -572,7 +574,7 @@ public class KafkaFetchRecords implements Runnable {
     public boolean isPaused() {
         // cannot use consumer directly as you can have 
ConcurrentModificationException as kafka client does not permit
         // multiple threads to use the client consumer, so we check the state 
only
-        return state == State.PAUSED;
+        return state.get() == State.PAUSED;
     }
 
     public void setConnected(boolean connected) {
@@ -639,7 +641,7 @@ public class KafkaFetchRecords implements Runnable {
      */
     public void pause() {
         LOG.info("A pause request was issued and the consumer thread will 
pause after current processing has finished");
-        state = State.PAUSE_REQUESTED;
+        state.set(State.PAUSE_REQUESTED);
     }
 
     /*
@@ -648,7 +650,7 @@ public class KafkaFetchRecords implements Runnable {
      */
     public void resume() {
         LOG.info("A resume request was issued and the consumer thread will 
resume after current processing has finished");
-        state = State.RESUME_REQUESTED;
+        state.set(State.RESUME_REQUESTED);
     }
 
     private synchronized void setLastError(Exception lastError) {
@@ -664,6 +666,32 @@ public class KafkaFetchRecords implements Runnable {
     }
 
     String getState() {
-        return state.name();
+        return state.get().name();
+    }
+
+    private class PausePreservingRebalanceListener implements 
ConsumerRebalanceListener {
+        private final ConsumerRebalanceListener delegate;
+
+        PausePreservingRebalanceListener(ConsumerRebalanceListener delegate) {
+            this.delegate = delegate;
+        }
+
+        @Override
+        public void onPartitionsRevoked(Collection<TopicPartition> partitions) 
{
+            delegate.onPartitionsRevoked(partitions);
+        }
+
+        @Override
+        public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
+            if (state.compareAndSet(State.PAUSED, State.PAUSE_REQUESTED)) {
+                LOG.debug("Partitions were assigned while paused, the consumer 
will be re-paused");
+            }
+            delegate.onPartitionsAssigned(partitions);
+        }
+
+        @Override
+        public void onPartitionsLost(Collection<TopicPartition> partitions) {
+            delegate.onPartitionsLost(partitions);
+        }
     }
 }

Reply via email to