This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ab88df74f21 KAFKA-20535: Improve async consumer CPU usage under low 
max.poll.records. (#22199)
ab88df74f21 is described below

commit ab88df74f21af96b59b6d45f257404772873ef12
Author: ChickenchickenLove <[email protected]>
AuthorDate: Mon May 11 22:23:45 2026 +0900

    KAFKA-20535: Improve async consumer CPU usage under low max.poll.records. 
(#22199)
    
    ### Description
    KAFKA-20332 fixed a correctness issue in the async consumer where the
    application thread could collect buffered records before the background
    thread had checked for pending reconciliations. The fix added a wait on
    `inflightPoll.reconciliationCheckFuture()` in
    `AsyncKafkaConsumer.collectFetch()`.
    
    This restored the correctness guarantee, but it also increased CPU usage
    in low `max.poll.records` scenarios. With `max.poll.records=5`,
    profiling shows that the additional cost mainly comes from the
    application thread waiting on
    `ConsumerUtils.getResult(inflightPoll.reconciliationCheckFuture(),
    timeoutMs)` even when the consumer group member is not reconciling.
    
    This patch avoids that unnecessary wait by tracking the consumer group
    member state in `AsyncKafkaConsumer`. `AbstractMembershipManager` now
    notifies `MemberStateListener` whenever the consumer member transitions
    to a new state. `AsyncKafkaConsumer` uses this signal to wait for the
    reconciliation check only while the member is in
    `MemberState.RECONCILING`.
    
    ### Test Condition
    - Broker
      - stand alone
      - 12 vCPU, 32GB RAM.
    - Producer
      - Use `bin/kafka-producer-perf-test.sh` in Broker.
      - throughput 50000
      - record-size 100
    - Consumer (before, after, optimized)
      - kubernetes environment.
      - All consumers are scheduled on the same worker node.
    - Profiler
      - async-profiler
      - duration 180 seconds.
    - branch
      -  `before`: 5a2dcf8fd0
      -  `after`: 7e1c9db92f
      -  `optimized`: this PR
    
    ### Test Result
    
    1. Check the throughput of each consumer
    ```
    Before - RATE: 49997.400259974005 records/sec, total=7050826
    After - RATE: 50002.199780022 records/sec, total=7557854
    Optimized - RATE: 50002.199780022 records/sec, total=7584198
    ```
    - All consumers have same throughput.
    
    2. Average CPU usage from `kubectl top pod`
    
    | Revision | Average CPU |    |---|---:|    | Before | 225.7m |    |
    After | 325.7m |    | Optimized | 248.4m |
    
    every 30second, 10 times.  The optimized version reduced CPU usage by
    about 23.7% compared with `after`.
    
    ### Flame Graph Summary
    | Metric | Before | After | Optimized |
    |---|---:|---:|---:|
    | Samples | 2,402 | 3,160 | 2,542 |
    | markReconciliationCheckComplete | 0.00% | 2.82% | 0.51% |
    | setActiveTask | 0.00% | 0.06% | 0.00% |
    | pollTimeMs | 0.00% | 0.00% | 0.00% |
    | AsyncPollEvent | 0.37% | 2.91% | 0.87% |
    | processBackgroundEvents | 2.37% | 1.93% | 2.28% |
    | Reaper | 0.17% | 0.19% | 0.35% |
    | parkNanos | 1.08% | 4.05% | 0.94% |
    | unpark | 0.37% | 2.06% | 0.63% |
    | AsyncKafkaConsumer.poll | 38.68% | 38.32% | 38.20% |
    | AsyncKafkaConsumer.collectFetch | 20.32% | 23.61% | 19.39% |
    | ApplicationEventProcessor.process | 2.66% | 6.46% | 2.83% |
    | ApplicationEventHandler.add | 7.87% | 7.37% | 8.06% |
    
    `After` shows higher CPU usage, and the profile also shows increased
    time in `parkNanos` and `unpark`. This suggests that the additional wait
    on `reconciliationCheckFuture` introduced more application/background
    thread coordination overhead.
    
    ### AsyncKafkaConsumer.collectFetch
    | Metric | Before | After | Optimized |
    |---|---:|---:|---:|
    | AsyncKafkaConsumer.collectFetch samples | 488 | 746 | 493 |
    | AsyncKafkaConsumer.collectFetch % | 20.32% | 23.61% | 19.39% |
    | FetchCollector.collectFetch samples | 482 | 512 | 477 |
    | FetchCollector.collectFetch % | 20.07% | 16.20% | 18.76% |
    | ConsumerUtils.getResult samples | 0 | 194 | 0 |
    | ConsumerUtils.getResult % | 0.00% | 6.14% | 0.00% |
    
    In `after`, the application thread spends additional time in
    `ConsumerUtils.getResult()` while waiting for the reconciliation check
    future. This also increases related park/unpark activity and application
    event processing on the background thread. In the optimized version,
    this wait is skipped unless the member is actually in `RECONCILING`
    state.
    
    ### ConsumerNetworkThread.runOnce
    | Metric | Before | After | Optimized |
    |---|---:|---:|---:|
    | ConsumerNetworkThread.runOnce samples | 1,216 | 1,621 | 1,295 |
    | ConsumerNetworkThread.runOnce % | 50.62% | 51.30% | 50.94% |
    | ConsumerNetworkThread.processApplicationEvents samples | 283 | 442 |
    263 |
    | ConsumerNetworkThread.processApplicationEvents % | 11.78% | 13.99% |
    10.35% |
    | FetchRequestManager.poll samples | 169 | 194 | 158 |
    | FetchRequestManager.poll % | 7.04% | 6.13% | 6.22% |
    | NetworkClientDelegate.poll samples | 600 | 755 | 684 |
    | NetworkClientDelegate.poll % | 24.98% | 23.89% | 26.91% |
    
    The higher CPU usage in `onsumerNetworkThread` appears to be a secondary
    effect of the application thread waiting on  `reconciliationCheckFuture`
    more often. Each wait requires coordination between the application
    thread and the ackground thread:   the app thread enqueues an
    `AsyncPollEvent`, waits for the reconciliation check to complete, and
    the background thread processes that event and completes the future. As
    a result, `ConsumerNetworkThread.processApplicationEvents` and related
    `AsyncPollEvent` processing show higher CPU usage in the after profile.
    
    Reviewers: Lianet Magrans <[email protected]>
---
 .../internals/AbstractMembershipManager.java       |  1 +
 .../consumer/internals/AsyncKafkaConsumer.java     | 13 ++++-
 .../consumer/internals/MemberStateListener.java    |  9 ++++
 .../consumer/internals/AsyncKafkaConsumerTest.java | 55 ++++++++++++++++++++++
 4 files changed, 77 insertions(+), 1 deletion(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
index 5782e138202..4d02d447d2b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
@@ -245,6 +245,7 @@ public abstract class AbstractMembershipManager<R extends 
AbstractResponse> impl
 
         log.info("Member {} with epoch {} transitioned from {} to {}.", 
memberId, memberEpoch, state, nextState);
         this.state = nextState;
+        stateUpdatesListeners.forEach(listener -> 
listener.onMemberStateChange(nextState));
     }
 
     private static boolean isCompletingRebalance(MemberState currentState, 
MemberState nextState) {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 08e5ce30ed6..38dd6cb23c7 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -420,6 +420,8 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
     private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);
     private final AtomicInteger refCount = new AtomicInteger(0);
 
+    private volatile boolean hasPendingReconciliation = false;
+
     private final MemberStateListener memberStateListener = new 
MemberStateListener() {
         @Override
         public void onMemberEpochUpdated(Optional<Integer> memberEpoch, String 
memberId) {
@@ -430,6 +432,11 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         public void onGroupAssignmentUpdated(Set<TopicPartition> partitions) {
             setGroupAssignmentSnapshot(partitions);
         }
+
+        @Override
+        public void onMemberStateChange(MemberState memberState) {
+            setHasPendingReconciliation(memberState == 
MemberState.RECONCILING);
+        }
     };
 
     public AsyncKafkaConsumer(final ConsumerConfig config,
@@ -873,6 +880,10 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         groupAssignmentSnapshot.set(Collections.unmodifiableSet(partitions));
     }
 
+    void setHasPendingReconciliation(final boolean hasPendingReconciliation) {
+        this.hasPendingReconciliation = hasPendingReconciliation;
+    }
+
     @Override
     public void registerMetricForSubscription(KafkaMetric metric) {
         if (!metrics().containsKey(metric.metricName())) {
@@ -2028,7 +2039,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         // This is key because partitions may need revocation, so we need to 
wait for the reconciliation check
         // that triggers commits and marks partitions as pending revocation, 
before we can
         // safely collect records from the buffer.
-        if (inflightPoll != null && 
!inflightPoll.isReconciliationCheckComplete()) {
+        if (hasPendingReconciliation && inflightPoll != null && 
!inflightPoll.isReconciliationCheckComplete()) {
             // If the background hasn't had the time to check for pending 
reconciliation,
             // we need to wait for that check before moving on (instead of 
returning empty right away,
             // which will lead to blocking on buffer data)
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberStateListener.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberStateListener.java
index 98b6271fcc0..cbf28b13af3 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberStateListener.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberStateListener.java
@@ -47,4 +47,13 @@ public interface MemberStateListener {
     default void onGroupAssignmentUpdated(Set<TopicPartition> partitions) {
 
     }
+
+    /**
+     * Called whenever the member transitions to a new state.
+     *
+     * @param memberState The member state.
+     */
+    default void onMemberStateChange(MemberState memberState) {
+   
+    }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index f64c8102280..c6833d6aee4 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -609,6 +609,7 @@ public class AsyncKafkaConsumerTest {
             // Do NOT mark reconciliation check complete - simulating 
background hasn't processed it yet
             return null;
         
}).when(applicationEventHandler).add(ArgumentMatchers.isA(AsyncPollEvent.class));
+        consumer.setHasPendingReconciliation(true);
 
         // Poll should return empty because reconciliation check is not 
complete.
         ConsumerRecords<?, ?> result1 = consumer.poll(Duration.ZERO);
@@ -623,6 +624,60 @@ public class AsyncKafkaConsumerTest {
         assertEquals(2, result2.count(), "Expected 2 records after 
reconciliation check is complete");
     }
 
+    @Test
+    public void 
testPollDoesNotWaitForReconciliationCheckIfNoPendingReconciliation() {
+        final String topicName = "foo";
+        final int partition = 3;
+        final TopicPartition tp = new TopicPartition(topicName, partition);
+        final List<ConsumerRecord<String, String>> records = asList(
+                new ConsumerRecord<>(topicName, partition, 2, "key1", 
"value1"),
+                new ConsumerRecord<>(topicName, partition, 3, "key2", "value2")
+        );
+
+        SubscriptionState subscriptions = new SubscriptionState(new 
LogContext(), AutoOffsetResetStrategy.NONE);
+        consumer = newConsumer(
+                mock(FetchBuffer.class),
+                new ConsumerInterceptors<>(Collections.emptyList(), metrics),
+                mock(ConsumerRebalanceListenerInvoker.class),
+                subscriptions);
+
+        
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+        // PositionsValidator starts with metadataUpdateVersion=-1. Stub 
metadata.updateVersion() to match,
+        // so canSkipUpdateFetchPositions() passes and we test the 
reconciliation check path.
+        doReturn(-1).when(metadata).updateVersion();
+
+        completeTopicSubscriptionChangeEventSuccessfully();
+        consumer.subscribe(singleton(topicName), 
mock(ConsumerRebalanceListener.class));
+        // Simulate partition assignment from group coordinator
+        subscriptions.assignFromSubscribed(singleton(tp));
+
+        // Set up position so canSkipUpdateFetchPositions() returns true 
(partition in FETCHING state)
+        completeSeekUnvalidatedEventSuccessfully();
+        subscriptions.seek(tp, 0);
+
+        // Set up fetch collector to return records when called
+        doReturn(Fetch.forPartition(tp, records, true, new 
OffsetAndMetadata(4, Optional.of(0), "")))
+                .when(fetchCollector).collectFetch(any(FetchBuffer.class));
+
+        // Capture the AsyncPollEvent but leave the reconciliation check 
incomplete.
+        // Since there is no pending reconciliation, poll should not wait for 
it.
+        AtomicReference<AsyncPollEvent> capturedEvent = new 
AtomicReference<>();
+        doAnswer(invocation -> {
+            AsyncPollEvent event = invocation.getArgument(0);
+            assertTrue(capturedEvent.compareAndSet(null, event));
+            // Do NOT mark reconciliation check complete - simulating 
background hasn't processed it yet
+            return null;
+        
}).when(applicationEventHandler).add(ArgumentMatchers.isA(AsyncPollEvent.class));
+        consumer.setHasPendingReconciliation(false);
+        
+        // Poll does not wait AsyncPollEvent if there is no pending 
reconciliation.
+        ConsumerRecords<?, ?> result = consumer.poll(Duration.ZERO);
+
+        assertNotNull(capturedEvent.get(), "AsyncPollEvent should have been 
captured");
+        assertFalse(capturedEvent.get().isReconciliationCheckComplete(), 
"Reconciliation check should still be incomplete");
+        assertEquals(2, result.count(), "Expected records without waiting when 
no reconciliation is pending");
+    }
+
     @Test
     public void testEnsureCallbackExecutedByApplicationThread() {
         consumer = newConsumer();

Reply via email to