This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 403f6c1caa4 KAFKA-19840: Handle exceptions in acknowledgement callback 
(#20880)
403f6c1caa4 is described below

commit 403f6c1caa4aa63c14c8da32ad98eff3a58d5853
Author: Andrew Schofield <[email protected]>
AuthorDate: Fri Nov 14 19:03:36 2025 +0000

    KAFKA-19840: Handle exceptions in acknowledgement callback (#20880)
    
    Catch exceptions from the acknowledgement commit callback. Previously,
    the exceptions were thrown by the methods of the KafkaShareConsumer
    interface, but this was not described in the KIP and did not make sense
    in practice.
    
    Reviewers: Apoorv Mittal <[email protected]>, ShivsundarR
    <[email protected]>
---
 .../kafka/clients/consumer/ShareConsumerTest.java  |  28 ++--
 .../kafka/clients/consumer/KafkaShareConsumer.java |   2 +-
 .../AcknowledgementCommitCallbackHandler.java      |  13 +-
 .../consumer/internals/RequestManagers.java        |   4 +-
 .../internals/ShareConsumeRequestManager.java      |  74 +++-------
 .../consumer/internals/ShareConsumerImpl.java      | 161 ++++++++++++---------
 .../consumer/internals/events/BackgroundEvent.java |   2 -
 ...ckEvent.java => ShareAcknowledgementEvent.java} |  21 ++-
 .../events/ShareAcknowledgementEventHandler.java   |  60 ++++++++
 .../ShareRenewAcknowledgementsCompleteEvent.java   |  42 ------
 .../internals/ShareConsumeRequestManagerTest.java  |  30 ++--
 .../consumer/internals/ShareConsumerImplTest.java  |  15 +-
 12 files changed, 236 insertions(+), 216 deletions(-)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
index 4cc6c195eac..da253fa1c42 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
@@ -58,16 +58,15 @@ import org.apache.kafka.common.test.ClusterInstance;
 import org.apache.kafka.common.test.api.ClusterConfigProperty;
 import org.apache.kafka.common.test.api.ClusterTest;
 import org.apache.kafka.common.test.api.ClusterTestDefaults;
-import org.apache.kafka.common.test.api.Flaky;
 import org.apache.kafka.common.test.api.Type;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.coordinator.group.GroupConfig;
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
 import org.apache.kafka.server.share.SharePartitionKey;
+import org.apache.kafka.test.NoRetryException;
 import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Timeout;
 
@@ -1621,10 +1620,9 @@ public class ShareConsumerTest {
     }
 
     /**
-     * Test to verify that the acknowledgement commit callback can throw an 
exception, and it is propagated
+     * Test to verify that the acknowledgement commit callback can throw an 
exception, and it is not propagated
      * to the caller of poll().
      */
-    @Flaky("KAFKA-19840") // https://issues.apache.org/jira/browse/KAFKA-19840
     @ClusterTest
     public void testAcknowledgementCommitCallbackThrowsException() throws 
InterruptedException {
         alterShareAutoOffsetReset("group1", "earliest");
@@ -1635,28 +1633,35 @@ public class ShareConsumerTest {
             producer.send(record);
             producer.flush();
 
-            shareConsumer.setAcknowledgementCommitCallback(new 
TestableAcknowledgementCommitCallbackThrows<>());
+            AtomicBoolean callbackCalled = new AtomicBoolean(false);
+            shareConsumer.setAcknowledgementCommitCallback(new 
TestableAcknowledgementCommitCallbackThrows(callbackCalled));
             shareConsumer.subscribe(Set.of(tp.topic()));
 
             TestUtils.waitForCondition(() -> 
shareConsumer.poll(Duration.ofMillis(2000)).count() == 1,
                 DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records 
for share consumer");
 
-            AtomicBoolean exceptionThrown = new AtomicBoolean(false);
             TestUtils.waitForCondition(() -> {
                 try {
                     shareConsumer.poll(Duration.ofMillis(500));
-                } catch 
(org.apache.kafka.common.errors.OutOfOrderSequenceException e) {
-                    exceptionThrown.set(true);
+                } catch (Exception e) {
+                    throw new NoRetryException(e);
                 }
-                return exceptionThrown.get();
-            }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to receive expected 
exception");
+                return callbackCalled.get();
+            }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Received unexpected exception 
or callback not called");
             verifyShareGroupStateTopicRecordsProduced();
         }
     }
 
-    private static class TestableAcknowledgementCommitCallbackThrows<K, V> 
implements AcknowledgementCommitCallback {
+    private static class TestableAcknowledgementCommitCallbackThrows 
implements AcknowledgementCommitCallback {
+        private final AtomicBoolean callbackCalled;
+
+        public TestableAcknowledgementCommitCallbackThrows(AtomicBoolean 
callbackCalled) {
+            this.callbackCalled = callbackCalled;
+        }
+
         @Override
         public void onComplete(Map<TopicIdPartition, Set<Long>> offsetsMap, 
Exception exception) {
+            callbackCalled.set(true);
             throw new 
org.apache.kafka.common.errors.OutOfOrderSequenceException("Exception thrown in 
TestableAcknowledgementCommitCallbackThrows.onComplete");
         }
     }
@@ -2339,7 +2344,6 @@ public class ShareConsumerTest {
         verifyShareGroupStateTopicRecordsProduced();
     }
 
-    @Disabled("KAFKA-19840") // 
https://issues.apache.org/jira/browse/KAFKA-19840
     @ClusterTest(
         brokers = 1,
         serverProperties = {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java
index 7bab150f17f..0d6b08910d2 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java
@@ -555,7 +555,7 @@ public class KafkaShareConsumer<K, V> implements 
ShareConsumer<K, V> {
      * the acknowledgements to commit have been indicated using {@link 
#acknowledge(ConsumerRecord)} or
      * {@link #acknowledge(ConsumerRecord, AcknowledgeType)}. If the consumer 
is using implicit acknowledgement,
      * all the records returned by the latest call to {@link #poll(Duration)} 
are acknowledged.
-
+     *
      * <p>
      * This is a synchronous commit and will block until either the commit 
succeeds, an unrecoverable error is
      * encountered (in which case it is thrown to the caller), or the timeout 
expires.
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AcknowledgementCommitCallbackHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AcknowledgementCommitCallbackHandler.java
index b746e1a2135..2f4d7b4c2be 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AcknowledgementCommitCallbackHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AcknowledgementCommitCallbackHandler.java
@@ -23,8 +23,6 @@ import org.apache.kafka.common.TopicIdPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -44,23 +42,18 @@ public class AcknowledgementCommitCallbackHandler {
     }
 
     void onComplete(List<Map<TopicIdPartition, Acknowledgements>> 
acknowledgementsMapList) {
-        final ArrayList<Throwable> exceptions = new ArrayList<>();
         acknowledgementsMapList.forEach(acknowledgementsMap -> 
acknowledgementsMap.forEach((partition, acknowledgements) -> {
             KafkaException exception = 
acknowledgements.getAcknowledgeException();
             Set<Long> offsets = 
acknowledgements.getAcknowledgementsTypeMap().keySet();
-            Set<Long> offsetsCopy = Collections.unmodifiableSet(offsets);
+            Set<Long> offsetsCopy = Set.copyOf(offsets);
             enteredCallback = true;
             try {
-                
acknowledgementCommitCallback.onComplete(Collections.singletonMap(partition, 
offsetsCopy), exception);
-            } catch (Throwable e) {
+                acknowledgementCommitCallback.onComplete(Map.of(partition, 
offsetsCopy), exception);
+            } catch (Exception e) {
                 LOG.error("Exception thrown by acknowledgement commit 
callback", e);
-                exceptions.add(e);
             } finally {
                 enteredCallback = false;
             }
         }));
-        if (!exceptions.isEmpty()) {
-            throw ConsumerUtils.maybeWrapAsKafkaException(exceptions.get(0), 
"Exception thrown by acknowledgement commit callback");
-        }
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
index 7f7dbe2fa5e..e9585d2d806 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.GroupRebalanceConfig;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
+import 
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementEventHandler;
 import org.apache.kafka.common.internals.IdempotentCloser;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.telemetry.internals.ClientTelemetryProvider;
@@ -317,6 +318,7 @@ public class RequestManagers implements Closeable {
     @SuppressWarnings({"checkstyle:ParameterNumber"})
     public static Supplier<RequestManagers> supplier(final Time time,
                                                      final LogContext 
logContext,
+                                                     final 
ShareAcknowledgementEventHandler shareAcknowledgementEventHandler,
                                                      final 
BackgroundEventHandler backgroundEventHandler,
                                                      final 
ShareConsumerMetadata metadata,
                                                      final SubscriptionState 
subscriptions,
@@ -371,7 +373,7 @@ public class RequestManagers implements Closeable {
                         subscriptions,
                         shareFetchConfig,
                         fetchBuffer,
-                        backgroundEventHandler,
+                        shareAcknowledgementEventHandler,
                         shareFetchMetricsManager,
                         retryBackoffMs,
                         retryBackoffMaxMs);
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
index eff925d342e..332492245e7 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
@@ -20,9 +20,8 @@ import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.Metadata;
 import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
 import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest;
-import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
-import 
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.ShareRenewAcknowledgementsCompleteEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementEventHandler;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
@@ -83,7 +82,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
     private final SubscriptionState subscriptions;
     private final ShareFetchConfig shareFetchConfig;
     protected final ShareFetchBuffer shareFetchBuffer;
-    private final BackgroundEventHandler backgroundEventHandler;
+    private final ShareAcknowledgementEventHandler acknowledgeEventHandler;
     private final Map<Integer, ShareSessionHandler> sessionHandlers;
     private final Set<Integer> nodesWithPendingRequests;
     private final ShareFetchMetricsManager metricsManager;
@@ -108,7 +107,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                                final SubscriptionState subscriptions,
                                final ShareFetchConfig shareFetchConfig,
                                final ShareFetchBuffer shareFetchBuffer,
-                               final BackgroundEventHandler 
backgroundEventHandler,
+                               final ShareAcknowledgementEventHandler 
acknowledgeEventHandler,
                                final ShareFetchMetricsManager metricsManager,
                                final long retryBackoffMs,
                                final long retryBackoffMaxMs) {
@@ -120,7 +119,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
         this.subscriptions = subscriptions;
         this.shareFetchConfig = shareFetchConfig;
         this.shareFetchBuffer = shareFetchBuffer;
-        this.backgroundEventHandler = backgroundEventHandler;
+        this.acknowledgeEventHandler = acknowledgeEventHandler;
         this.metricsManager = metricsManager;
         this.retryBackoffMs = retryBackoffMs;
         this.retryBackoffMaxMs = retryBackoffMaxMs;
@@ -228,8 +227,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                             } else {
                                 log.debug("Leader for the partition is down or 
has changed, failing Acknowledgements for partition {}", tip);
                                 
acks.complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
-                                
sendShareRenewAcknowledgementsCompleteEvent(Map.of(tip, acks));
-                                
maybeSendShareAcknowledgeCommitCallbackEvent(Map.of(tip, acks));
+                                maybeSendShareAcknowledgementEvent(Map.of(tip, 
acks), true);
                             }
                         });
 
@@ -276,8 +274,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
             // Failing the acknowledgements as we cannot have piggybacked 
acknowledgements in the initial ShareFetchRequest.
             log.debug("Cannot send acknowledgements on initial epoch for 
ShareSession for partition {}", tip);
             
acknowledgements.complete(Errors.INVALID_SHARE_SESSION_EPOCH.exception());
-            sendShareRenewAcknowledgementsCompleteEvent(Map.of(tip, 
acknowledgements));
-            maybeSendShareAcknowledgeCommitCallbackEvent(Map.of(tip, 
acknowledgements));
+            maybeSendShareAcknowledgementEvent(Map.of(tip, acknowledgements), 
true);
             return false;
         } else {
             metricsManager.recordAcknowledgementSent(acknowledgements.size());
@@ -385,18 +382,13 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
         this.isAcknowledgementCommitCallbackRegistered = 
isAcknowledgementCommitCallbackRegistered;
     }
 
-    private void 
maybeSendShareAcknowledgeCommitCallbackEvent(Map<TopicIdPartition, 
Acknowledgements> acknowledgementsMap) {
-        if (isAcknowledgementCommitCallbackRegistered) {
-            ShareAcknowledgementCommitCallbackEvent event = new 
ShareAcknowledgementCommitCallbackEvent(acknowledgementsMap);
-            backgroundEventHandler.add(event);
+    private void maybeSendShareAcknowledgementEvent(Map<TopicIdPartition, 
Acknowledgements> acknowledgementsMap, boolean checkForRenewAcknowledgements) {
+        if (isAcknowledgementCommitCallbackRegistered || 
checkForRenewAcknowledgements) {
+            ShareAcknowledgementEvent event = new 
ShareAcknowledgementEvent(acknowledgementsMap, checkForRenewAcknowledgements);
+            acknowledgeEventHandler.add(event);
         }
     }
 
-    private void 
sendShareRenewAcknowledgementsCompleteEvent(Map<TopicIdPartition, 
Acknowledgements> acknowledgementsMap) {
-        ShareRenewAcknowledgementsCompleteEvent event = new 
ShareRenewAcknowledgementsCompleteEvent(acknowledgementsMap);
-        backgroundEventHandler.add(event);
-    }
-
     /**
      *
      * @param acknowledgeRequestState Contains the acknowledgements to be sent.
@@ -544,8 +536,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                             resultCount.incrementAndGet();
                         } else {
                             
nodeAcknowledgements.acknowledgements().complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
-                            
sendShareRenewAcknowledgementsCompleteEvent(Map.of(tip, 
nodeAcknowledgements.acknowledgements()));
-                            
maybeSendShareAcknowledgeCommitCallbackEvent(Map.of(tip, 
nodeAcknowledgements.acknowledgements()));
+                            maybeSendShareAcknowledgementEvent(Map.of(tip, 
nodeAcknowledgements.acknowledgements()), true);
                         }
                     }
                 }
@@ -564,7 +555,6 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                     ));
                 }
             }
-
         });
 
         resultHandler.completeIfEmpty();
@@ -621,8 +611,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                             }
                         } else {
                             
nodeAcknowledgements.acknowledgements().complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
-                            
sendShareRenewAcknowledgementsCompleteEvent(Map.of(tip, 
nodeAcknowledgements.acknowledgements()));
-                            
maybeSendShareAcknowledgeCommitCallbackEvent(Map.of(tip, 
nodeAcknowledgements.acknowledgements()));
+                            maybeSendShareAcknowledgementEvent(Map.of(tip, 
nodeAcknowledgements.acknowledgements()), true);
                         }
                     }
                 }
@@ -660,8 +649,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                 }
             } else {
                 
nodeAcks.acknowledgements().complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
-                sendShareRenewAcknowledgementsCompleteEvent(Map.of(tip, 
nodeAcks.acknowledgements()));
-                maybeSendShareAcknowledgeCommitCallbackEvent(Map.of(tip, 
nodeAcks.acknowledgements()));
+                maybeSendShareAcknowledgementEvent(Map.of(tip, 
nodeAcks.acknowledgements()), true);
             }
         });
 
@@ -680,8 +668,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                             }
                         } else {
                             
acks.complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
-                            
sendShareRenewAcknowledgementsCompleteEvent(Map.of(tip, acks));
-                            
maybeSendShareAcknowledgeCommitCallbackEvent(Map.of(tip, acks));
+                            maybeSendShareAcknowledgementEvent(Map.of(tip, 
acks), true);
                         }
                     });
                 }
@@ -770,17 +757,13 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                     metadata.requestUpdate(false);
                 }
                 // Complete any in-flight acknowledgements with the error code 
from the response.
-                Map<TopicIdPartition, Acknowledgements> 
nodeAcknowledgementsInFlight = 
fetchAcknowledgementsInFlight.get(fetchTarget.id());
+                Map<TopicIdPartition, Acknowledgements> 
nodeAcknowledgementsInFlight = 
fetchAcknowledgementsInFlight.remove(fetchTarget.id());
                 if (nodeAcknowledgementsInFlight != null) {
                     nodeAcknowledgementsInFlight.forEach((tip, acks) -> {
                         
acks.complete(Errors.forCode(response.error().code()).exception());
                         
metricsManager.recordFailedAcknowledgements(acks.size());
                     });
-                    if (requestData.isRenewAck()) {
-                        
sendShareRenewAcknowledgementsCompleteEvent(nodeAcknowledgementsInFlight);
-                    }
-                    
maybeSendShareAcknowledgeCommitCallbackEvent(nodeAcknowledgementsInFlight);
-                    nodeAcknowledgementsInFlight.clear();
+                    
maybeSendShareAcknowledgementEvent(nodeAcknowledgementsInFlight, 
requestData.isRenewAck());
                 }
                 return;
             }
@@ -818,10 +801,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                         
acks.complete(Errors.forCode(partitionData.acknowledgeErrorCode())
                                 
.exception(partitionData.acknowledgeErrorMessage()));
                         Map<TopicIdPartition, Acknowledgements> acksMap = 
Map.of(tip, acks);
-                        if (requestData.isRenewAck()) {
-                            
sendShareRenewAcknowledgementsCompleteEvent(acksMap);
-                        }
-                        maybeSendShareAcknowledgeCommitCallbackEvent(acksMap);
+                        maybeSendShareAcknowledgementEvent(acksMap, 
requestData.isRenewAck());
                     }
                 }
 
@@ -858,8 +838,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
             if (fetchAcknowledgementsInFlight.get(fetchTarget.id()) != null) {
                 
fetchAcknowledgementsInFlight.remove(fetchTarget.id()).forEach((partition, 
acknowledgements) -> {
                     acknowledgements.complete(new 
InvalidRecordStateException(INVALID_RESPONSE));
-                    
sendShareRenewAcknowledgementsCompleteEvent(Map.of(partition, 
acknowledgements));
-                    
maybeSendShareAcknowledgeCommitCallbackEvent(Map.of(partition, 
acknowledgements));
+                    maybeSendShareAcknowledgementEvent(Map.of(partition, 
acknowledgements), true);
                 });
             }
 
@@ -906,10 +885,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                             
acks.complete(Errors.UNKNOWN_SERVER_ERROR.exception());
                         }
                         Map<TopicIdPartition, Acknowledgements> acksMap = 
Map.of(tip, acks);
-                        if (requestData.isRenewAck()) {
-                            
sendShareRenewAcknowledgementsCompleteEvent(nodeAcknowledgementsInFlight);
-                        }
-                        maybeSendShareAcknowledgeCommitCallbackEvent(acksMap);
+                        maybeSendShareAcknowledgementEvent(acksMap, 
requestData.isRenewAck());
                     }
                 }
             }));
@@ -1439,20 +1415,14 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
         public void complete(TopicIdPartition partition, Acknowledgements 
acknowledgements, AcknowledgeRequestType type, boolean isRenewAck) {
             if (type.equals(AcknowledgeRequestType.COMMIT_ASYNC)) {
                 if (acknowledgements != null) {
-                    if (isRenewAck) {
-                        
sendShareRenewAcknowledgementsCompleteEvent(Map.of(partition, 
acknowledgements));
-                    }
-                    
maybeSendShareAcknowledgeCommitCallbackEvent(Map.of(partition, 
acknowledgements));
+                    maybeSendShareAcknowledgementEvent(Map.of(partition, 
acknowledgements), isRenewAck);
                 }
             } else {
                 if (acknowledgements != null) {
                     result.put(partition, acknowledgements);
                 }
                 if (remainingResults != null && 
remainingResults.decrementAndGet() == 0) {
-                    if (isRenewAck) {
-                        sendShareRenewAcknowledgementsCompleteEvent(result);
-                    }
-                    maybeSendShareAcknowledgeCommitCallbackEvent(result);
+                    maybeSendShareAcknowledgementEvent(result, isRenewAck);
                     future.ifPresent(future -> future.complete(result));
                 }
             }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
index a2274b8e9cd..079a6280055 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
@@ -24,7 +24,6 @@ import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.consumer.AcknowledgeType;
 import org.apache.kafka.clients.consumer.AcknowledgementCommitCallback;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.ShareConsumer;
@@ -41,11 +40,11 @@ import 
org.apache.kafka.clients.consumer.internals.events.EventProcessor;
 import 
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeAsyncEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeOnCloseEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeSyncEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackRegistrationEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementEventHandler;
 import org.apache.kafka.clients.consumer.internals.events.ShareFetchEvent;
 import org.apache.kafka.clients.consumer.internals.events.SharePollEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.ShareRenewAcknowledgementsCompleteEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ShareSubscriptionChangeEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ShareUnsubscribeEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.StopFindCoordinatorOnCloseEvent;
@@ -121,16 +120,34 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
 
     private static final long NO_CURRENT_THREAD = -1L;
 
+    /**
+     * An {@link 
org.apache.kafka.clients.consumer.internals.events.EventProcessor} that is 
created and executes in the
+     * application thread for the purpose of processing {@link 
ShareAcknowledgementEvent share acknowledgement events}
+     * generated by the {@link ConsumerNetworkThread network thread}. These 
are distinct from background events because
+     * they do not carry exception information which needs to be thrown in the 
application thread. As a result, they
+     * are held on a separate queue. Share acknowledgement events are used 
when there is an acknowledgement commit
+     * callback registered, and when renew acknowledgements are being used.
+     */
+    private class ShareAcknowledgementEventProcessor implements 
EventProcessor<ShareAcknowledgementEvent> {
+
+        public ShareAcknowledgementEventProcessor() {}
+
+        @Override
+        public void process(final ShareAcknowledgementEvent event) {
+            if (acknowledgementCommitCallbackHandler != null) {
+                completedAcknowledgements.add(event.acknowledgementsMap());
+            }
+            if (event.checkForRenewAcknowledgements()) {
+                currentFetch.renew(event.acknowledgementsMap());
+            }
+        }
+    }
+
     /**
      * An {@link 
org.apache.kafka.clients.consumer.internals.events.EventProcessor} that is 
created and executes in the
      * application thread for the purpose of processing {@link BackgroundEvent 
background events} generated by the
-     * {@link ConsumerNetworkThread network thread}.
-     * Those events are generally of two types:
-     *
-     * <ul>
-     *     <li>Errors that occur in the network thread that need to be 
propagated to the application thread</li>
-     *     <li>{@link ConsumerRebalanceListener} callbacks that are to be 
executed on the application thread</li>
-     * </ul>
+     * {@link ConsumerNetworkThread network thread}. These are errors that 
occur in the network thread that need to be
+     * propagated to the application thread.
      */
     private class BackgroundEventProcessor implements 
EventProcessor<BackgroundEvent> {
 
@@ -138,37 +155,16 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
 
         @Override
         public void process(final BackgroundEvent event) {
-            switch (event.type()) {
-                case ERROR:
-                    process((ErrorEvent) event);
-                    break;
-
-                case SHARE_ACKNOWLEDGEMENT_COMMIT_CALLBACK:
-                    process((ShareAcknowledgementCommitCallbackEvent) event);
-                    break;
-
-                case SHARE_RENEW_ACKNOWLEDGEMENTS_COMPLETE:
-                    process((ShareRenewAcknowledgementsCompleteEvent) event);
-                    break;
-
-                default:
-                    throw new IllegalArgumentException("Background event type 
" + event.type() + " was not expected");
+            if (event.type() == BackgroundEvent.Type.ERROR) {
+                process((ErrorEvent) event);
+            } else {
+                throw new IllegalArgumentException("Background event type " + 
event.type() + " was not expected");
             }
         }
 
         private void process(final ErrorEvent event) {
             throw event.error();
         }
-
-        private void process(final ShareAcknowledgementCommitCallbackEvent 
event) {
-            if (acknowledgementCommitCallbackHandler != null) {
-                completedAcknowledgements.add(event.acknowledgementsMap());
-            }
-        }
-
-        private void process(final ShareRenewAcknowledgementsCompleteEvent 
event) {
-            currentFetch.renew(event.acknowledgementsMap());
-        }
     }
 
     private final ApplicationEventHandler applicationEventHandler;
@@ -178,6 +174,9 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
     private Logger log;
     private final String clientId;
     private final String groupId;
+    private final ShareAcknowledgementEventHandler acknowledgementEventHandler;
+    private final BlockingQueue<ShareAcknowledgementEvent> 
acknowledgementEventQueue;
+    private final ShareAcknowledgementEventProcessor 
acknowledgementEventProcessor;
     private final BlockingQueue<BackgroundEvent> backgroundEventQueue;
     private final BackgroundEventHandler backgroundEventHandler;
     private final BackgroundEventProcessor backgroundEventProcessor;
@@ -225,6 +224,7 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
                 ApplicationEventHandler::new,
                 CompletableEventReaper::new,
                 ShareFetchCollector::new,
+                new LinkedBlockingQueue<>(),
                 new LinkedBlockingQueue<>()
         );
     }
@@ -237,6 +237,7 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
                       final ApplicationEventHandlerFactory 
applicationEventHandlerFactory,
                       final AsyncKafkaConsumer.CompletableEventReaperFactory 
backgroundEventReaperFactory,
                       final ShareFetchCollectorFactory<K, V> 
fetchCollectorFactory,
+                      final LinkedBlockingQueue<ShareAcknowledgementEvent> 
acknowledgementEventQueue,
                       final LinkedBlockingQueue<BackgroundEvent> 
backgroundEventQueue) {
         try {
             GroupRebalanceConfig groupRebalanceConfig = new 
GroupRebalanceConfig(
@@ -247,6 +248,7 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
             this.groupId = config.getString(ConsumerConfig.GROUP_ID_CONFIG);
             maybeThrowInvalidGroupIdException();
             LogContext logContext = createLogContext(clientId, groupId);
+            this.acknowledgementEventQueue = acknowledgementEventQueue;
             this.backgroundEventQueue = backgroundEventQueue;
             this.log = logContext.logger(getClass());
 
@@ -274,6 +276,7 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
             ShareFetchMetricsManager shareFetchMetricsManager = 
createShareFetchMetricsManager(metrics);
             ApiVersions apiVersions = new ApiVersions();
             final BlockingQueue<ApplicationEvent> applicationEventQueue = new 
LinkedBlockingQueue<>();
+            this.acknowledgementEventHandler = new 
ShareAcknowledgementEventHandler(acknowledgementEventQueue);
             this.backgroundEventHandler = new BackgroundEventHandler(
                 backgroundEventQueue, time, asyncConsumerMetrics);
 
@@ -297,6 +300,7 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
             final Supplier<RequestManagers> requestManagersSupplier = 
RequestManagers.supplier(
                     time,
                     logContext,
+                    acknowledgementEventHandler,
                     backgroundEventHandler,
                     metadata,
                     subscriptions,
@@ -324,6 +328,7 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
                     requestManagersSupplier,
                     asyncConsumerMetrics);
 
+            this.acknowledgementEventProcessor = new 
ShareAcknowledgementEventProcessor();
             this.backgroundEventProcessor = new BackgroundEventProcessor();
             this.backgroundEventReaper = 
backgroundEventReaperFactory.build(logContext);
 
@@ -389,6 +394,8 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
         this.asyncConsumerMetrics = new AsyncConsumerMetrics(metrics, 
CONSUMER_SHARE_METRIC_GROUP);
 
         final BlockingQueue<ApplicationEvent> applicationEventQueue = new 
LinkedBlockingQueue<>();
+        this.acknowledgementEventQueue = new LinkedBlockingQueue<>();
+        this.acknowledgementEventHandler = new 
ShareAcknowledgementEventHandler(acknowledgementEventQueue);
         this.backgroundEventQueue = new LinkedBlockingQueue<>();
         this.backgroundEventHandler = new BackgroundEventHandler(
             backgroundEventQueue, time, asyncConsumerMetrics);
@@ -402,6 +409,7 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
         final Supplier<RequestManagers> requestManagersSupplier = 
RequestManagers.supplier(
                 time,
                 logContext,
+                acknowledgementEventHandler,
                 backgroundEventHandler,
                 metadata,
                 subscriptions,
@@ -430,6 +438,7 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
                 requestManagersSupplier,
                 asyncConsumerMetrics);
 
+        this.acknowledgementEventProcessor = new 
ShareAcknowledgementEventProcessor();
         this.backgroundEventProcessor = new BackgroundEventProcessor();
         this.backgroundEventReaper = new CompletableEventReaper(logContext);
 
@@ -447,6 +456,7 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
                       final ShareFetchCollector<K, V> fetchCollector,
                       final Time time,
                       final ApplicationEventHandler applicationEventHandler,
+                      final BlockingQueue<ShareAcknowledgementEvent> 
acknowledgementEventQueue,
                       final BlockingQueue<BackgroundEvent> 
backgroundEventQueue,
                       final CompletableEventReaper backgroundEventReaper,
                       final Metrics metrics,
@@ -463,6 +473,8 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
         this.fetchBuffer = fetchBuffer;
         this.fetchCollector = fetchCollector;
         this.time = time;
+        this.acknowledgementEventQueue = acknowledgementEventQueue;
+        this.acknowledgementEventProcessor = new 
ShareAcknowledgementEventProcessor();
         this.backgroundEventQueue = backgroundEventQueue;
         this.backgroundEventProcessor = new BackgroundEventProcessor();
         this.backgroundEventReaper = backgroundEventReaper;
@@ -478,6 +490,7 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
         this.clientTelemetryReporter = Optional.empty();
         this.completedAcknowledgements = Collections.emptyList();
         this.asyncConsumerMetrics = new AsyncConsumerMetrics(metrics, 
CONSUMER_SHARE_METRIC_GROUP);
+        this.acknowledgementEventHandler = new 
ShareAcknowledgementEventHandler(acknowledgementEventQueue);
         this.backgroundEventHandler = new BackgroundEventHandler(
                 backgroundEventQueue, time, asyncConsumerMetrics);
     }
@@ -579,8 +592,11 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
 
         acquireAndEnsureOpen();
         try {
+            // Throw any errors notified by the background thread
+            processBackgroundEvents();
+
             // Handle any completed acknowledgements for which we already have 
the responses
-            handleCompletedAcknowledgements(false);
+            handleCompletedAcknowledgements();
 
             // If using implicit acknowledgement, acknowledge the previously 
fetched records
             acknowledgeBatchIfImplicitAcknowledgement();
@@ -600,8 +616,6 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
                 // Make sure the network thread can tell the application is 
actively polling
                 applicationEventHandler.add(new 
SharePollEvent(timer.currentTimeMs()));
 
-                processBackgroundEvents();
-
                 // We must not allow wake-ups between polling for fetches and 
returning the records.
                 // A wake-up between returned fetches and returning records 
would lead to never
                 // returning the records in the fetches. Thus, we trigger a 
possible wake-up before we poll fetches.
@@ -610,14 +624,18 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
                 final ShareFetch<K, V> fetch = pollForFetches(timer);
                 if (!fetch.isEmpty()) {
                     currentFetch = fetch;
+                    handleCompletedAcknowledgements();
                     return new ConsumerRecords<>(fetch.records(), Map.of());
                 }
 
+                // Throw any errors notified by the background thread
+                processBackgroundEvents();
                 metadata.maybeThrowAnyException();
 
                 // We will wait for retryBackoffMs
             } while (timer.notExpired());
 
+            handleCompletedAcknowledgements();
             return ConsumerRecords.empty();
         } catch (ShareFetchException e) {
             currentFetch = (ShareFetch<K, V>) e.shareFetch();
@@ -625,14 +643,6 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
         } finally {
             kafkaShareConsumerMetrics.recordPollEnd(timer.currentTimeMs());
 
-            // Handle any acknowledgements which completed while we were 
waiting, but do not throw
-            // the exception because the fetched records would then not be 
returned to the caller
-            try {
-                handleCompletedAcknowledgements(false);
-            } catch (Throwable t) {
-                log.warn("Exception thrown in acknowledgement commit 
callback", t);
-            }
-
             release();
         }
     }
@@ -776,7 +786,7 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
         acquireAndEnsureOpen();
         try {
             // Handle any completed acknowledgements for which we already have 
the responses
-            handleCompletedAcknowledgements(false);
+            handleCompletedAcknowledgements();
 
             // If using implicit acknowledgement, acknowledge the previously 
fetched records
             acknowledgeBatchIfImplicitAcknowledgement();
@@ -803,7 +813,7 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
                     });
 
                     // Handle any acknowledgements which completed while we 
were waiting
-                    handleCompletedAcknowledgements(false);
+                    handleCompletedAcknowledgements();
 
                     return result;
                 } finally {
@@ -823,7 +833,7 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
         acquireAndEnsureOpen();
         try {
             // Handle any completed acknowledgements for which we already have 
the responses
-            handleCompletedAcknowledgements(false);
+            handleCompletedAcknowledgements();
 
             // If using implicit acknowledgement, acknowledge the previously 
fetched records
             acknowledgeBatchIfImplicitAcknowledgement();
@@ -963,7 +973,9 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
         swallow(log, Level.ERROR, "Failed to stop finding coordinator",
                 this::stopFindCoordinatorOnClose, firstException);
         swallow(log, Level.ERROR, "Failed invoking acknowledgement commit 
callback",
-                () -> handleCompletedAcknowledgements(true), firstException);
+                this::handleCompletedAcknowledgements, firstException);
+        swallow(log, Level.ERROR, "Failed processing background events",
+                this::processBackgroundEventsOnClose, firstException);
         if (applicationEventHandler != null)
             closeQuietly(() -> 
applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), 
"Failed shutting down network thread", firstException);
         closeTimer.update();
@@ -1101,18 +1113,13 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
      * Handles any completed acknowledgements. If there is an acknowledgement 
commit callback registered,
      * call it. Otherwise, discard the information about completed 
acknowledgements because the application
      * is not interested.
-     * <p>
-     * If the acknowledgement commit callback throws an exception, this method 
will throw an exception.
      */
-    private void handleCompletedAcknowledgements(boolean onClose) {
-        if (backgroundEventQueue == null || backgroundEventReaper == null || 
backgroundEventProcessor == null) {
+    private void handleCompletedAcknowledgements() {
+        if (acknowledgementEventQueue == null || acknowledgementEventHandler 
== null) {
             return;
         }
-        // If the user gets any fatal errors, they will get these exceptions 
in the background queue.
-        // While closing, we ignore these exceptions so that the consumers 
close successfully.
-        processBackgroundEvents(onClose ? e -> (e instanceof 
GroupAuthorizationException
-                || e instanceof TopicAuthorizationException
-                || e instanceof InvalidTopicException) : e -> false);
+
+        processAcknowledgementEvents();
 
         if (!completedAcknowledgements.isEmpty()) {
             try {
@@ -1171,20 +1178,44 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
         return ShareAcknowledgementMode.fromString(s);
     }
 
-    private void processBackgroundEvents(final Predicate<Exception> 
ignoreErrorEventException) {
+    /**
+     * Process acknowledgement events, if any, that were produced by the 
{@link ConsumerNetworkThread network thread}.
+     */
+    void processAcknowledgementEvents() {
+        List<ShareAcknowledgementEvent> events = 
acknowledgementEventHandler.drainEvents();
+        if (!events.isEmpty()) {
+            for (ShareAcknowledgementEvent event : events) {
+                try {
+                    acknowledgementEventProcessor.process(event);
+                } catch (Exception e) {
+                    log.warn("An error occurred when processing the 
acknowledgement event: {}", e.getMessage(), e);
+                }
+            }
+        }
+    }
+
+    /**
+     * Process background events on close. Except some expected exceptions 
which might occur
+     * during close, exceptions encountered are thrown.
+     */
+    void processBackgroundEventsOnClose() {
+        if (backgroundEventQueue == null || backgroundEventHandler == null) {
+            return;
+        }
+
         try {
             processBackgroundEvents();
         } catch (Exception e) {
-            if (!ignoreErrorEventException.test(e))
+            if (!(e instanceof GroupAuthorizationException || e instanceof 
TopicAuthorizationException || e instanceof InvalidTopicException))
                 throw e;
         }
     }
 
     /**
-     * Process the events—if any—that were produced by the {@link 
ConsumerNetworkThread network thread}.
-     * It is possible that {@link ErrorEvent an error}
-     * could occur when processing the events. In such cases, the processor 
will take a reference to the first
-     * error, continue to process the remaining events, and then throw the 
first error that occurred.
+     * Process background events, if any, that were produced by the {@link 
ConsumerNetworkThread network thread}.
+     * It is possible that {@link ErrorEvent an error} could occur when 
processing the events. In such
+     * cases, the processor will take a reference to the first error, continue 
to process the remaining
+     * events, and then throw the first error that occurred.
      *
      * Visible for testing.
      */
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
index 9f84df9414d..dc33a134bcf 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
@@ -28,8 +28,6 @@ public abstract class BackgroundEvent {
     public enum Type {
         ERROR,
         CONSUMER_REBALANCE_LISTENER_CALLBACK_NEEDED,
-        SHARE_ACKNOWLEDGEMENT_COMMIT_CALLBACK,
-        SHARE_RENEW_ACKNOWLEDGEMENTS_COMPLETE,
         STREAMS_ON_TASKS_ASSIGNED_CALLBACK_NEEDED,
         STREAMS_ON_TASKS_REVOKED_CALLBACK_NEEDED,
         STREAMS_ON_ALL_TASKS_LOST_CALLBACK_NEEDED
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgementCommitCallbackEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgementEvent.java
similarity index 62%
rename from 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgementCommitCallbackEvent.java
rename to 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgementEvent.java
index 3f52f853208..9dfc06b1f42 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgementCommitCallbackEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgementEvent.java
@@ -17,26 +17,31 @@
 package org.apache.kafka.clients.consumer.internals.events;
 
 import org.apache.kafka.clients.consumer.internals.Acknowledgements;
+import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
 import org.apache.kafka.common.TopicIdPartition;
 
-import java.util.HashMap;
 import java.util.Map;
 
-public class ShareAcknowledgementCommitCallbackEvent extends BackgroundEvent {
+/**
+ * This is the class of events created by the {@link ConsumerNetworkThread 
network thread} to indicate completion
+ * of acknowledgements.
+ */
+public class ShareAcknowledgementEvent {
 
     private final Map<TopicIdPartition, Acknowledgements> acknowledgementsMap;
+    private final boolean checkForRenewAcknowledgements;
 
-    public ShareAcknowledgementCommitCallbackEvent(Map<TopicIdPartition, 
Acknowledgements> acknowledgementsMap) {
-        super(Type.SHARE_ACKNOWLEDGEMENT_COMMIT_CALLBACK);
-        this.acknowledgementsMap = new HashMap<>(acknowledgementsMap);
+    public ShareAcknowledgementEvent(Map<TopicIdPartition, Acknowledgements> 
acknowledgementsMap,
+                                     boolean checkForRenewAcknowledgements) {
+        this.acknowledgementsMap = acknowledgementsMap;
+        this.checkForRenewAcknowledgements = checkForRenewAcknowledgements;
     }
 
     public Map<TopicIdPartition, Acknowledgements> acknowledgementsMap() {
         return acknowledgementsMap;
     }
 
-    @Override
-    public String toStringBase() {
-        return super.toStringBase() + ", acknowledgementsMap=" + 
acknowledgementsMap;
+    public boolean checkForRenewAcknowledgements() {
+        return checkForRenewAcknowledgements;
     }
 }
\ No newline at end of file
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgementEventHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgementEventHandler.java
new file mode 100644
index 00000000000..c1c2d40e00b
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgementEventHandler.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * An event handler that receives {@link BackgroundEvent background events} 
from the
+ * {@link ConsumerNetworkThread network thread} which are then made available 
to the application thread
+ * via an {@link EventProcessor}.
+ */
+
+public class ShareAcknowledgementEventHandler {
+
+    private final BlockingQueue<ShareAcknowledgementEvent> eventQueue;
+
+    public ShareAcknowledgementEventHandler(final 
BlockingQueue<ShareAcknowledgementEvent> eventQueue) {
+        this.eventQueue = eventQueue;
+    }
+
+    /**
+     * Add a {@link ShareAcknowledgementEvent} to the handler.
+     *
+     * @param event A {@link ShareAcknowledgementEvent} created by the {@link 
ConsumerNetworkThread network thread}
+     */
+    public void add(ShareAcknowledgementEvent event) {
+        Objects.requireNonNull(event, "ShareAcknowledgementCompleteEvent 
provided to add must be non-null");
+        eventQueue.add(event);
+    }
+
+    /**
+     * Drain all the {@link ShareAcknowledgementEvent events} from the handler.
+     *
+     * @return A list of {@link ShareAcknowledgementEvent events} that were 
drained
+     */
+    public List<ShareAcknowledgementEvent> drainEvents() {
+        List<ShareAcknowledgementEvent> events = new ArrayList<>();
+        eventQueue.drainTo(events);
+        return events;
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareRenewAcknowledgementsCompleteEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareRenewAcknowledgementsCompleteEvent.java
deleted file mode 100644
index 1151ac04a52..00000000000
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareRenewAcknowledgementsCompleteEvent.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.consumer.internals.events;
-
-import org.apache.kafka.clients.consumer.internals.Acknowledgements;
-import org.apache.kafka.common.TopicIdPartition;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class ShareRenewAcknowledgementsCompleteEvent extends BackgroundEvent {
-
-    private final Map<TopicIdPartition, Acknowledgements> acknowledgementsMap;
-
-    public ShareRenewAcknowledgementsCompleteEvent(Map<TopicIdPartition, 
Acknowledgements> acknowledgementsMap) {
-        super(Type.SHARE_RENEW_ACKNOWLEDGEMENTS_COMPLETE);
-        this.acknowledgementsMap = new HashMap<>(acknowledgementsMap);
-    }
-
-    public Map<TopicIdPartition, Acknowledgements> acknowledgementsMap() {
-        return acknowledgementsMap;
-    }
-
-    @Override
-    public String toStringBase() {
-        return super.toStringBase() + ", acknowledgementsMap=" + 
acknowledgementsMap;
-    }
-}
\ No newline at end of file
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
index d2abe110ceb..f199999a8f0 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
@@ -25,10 +25,9 @@ import org.apache.kafka.clients.consumer.AcknowledgeType;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ShareAcquireMode;
-import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
-import 
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.ShareRenewAcknowledgementsCompleteEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementEventHandler;
 import 
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.IsolationLevel;
@@ -2763,7 +2762,7 @@ public class ShareConsumeRequestManagerTest {
                 subscriptions,
                 shareFetchConfig,
                 deserializers);
-        BackgroundEventHandler backgroundEventHandler = new 
TestableBackgroundEventHandler(time, completedAcknowledgements, renewedRecords);
+        ShareAcknowledgementEventHandler acknowledgementEventHandler = new 
TestableShareAcknowledgementEventHandler(completedAcknowledgements, 
renewedRecords);
         shareConsumeRequestManager = spy(new 
TestableShareConsumeRequestManager<>(
                 logContext,
                 groupId,
@@ -2771,7 +2770,7 @@ public class ShareConsumeRequestManagerTest {
                 subscriptionState,
                 shareFetchConfig,
                 new ShareFetchBuffer(logContext),
-                backgroundEventHandler,
+                acknowledgementEventHandler,
                 metricsManager,
                 shareFetchCollector));
     }
@@ -2810,11 +2809,11 @@ public class ShareConsumeRequestManagerTest {
                                                   SubscriptionState 
subscriptions,
                                                   ShareFetchConfig 
shareFetchConfig,
                                                   ShareFetchBuffer 
shareFetchBuffer,
-                                                  BackgroundEventHandler 
backgroundEventHandler,
+                                                  
ShareAcknowledgementEventHandler acknowledgementEventHandler,
                                                   ShareFetchMetricsManager 
metricsManager,
                                                   ShareFetchCollector<K, V> 
fetchCollector) {
             super(time, logContext, groupId, metadata, subscriptions, 
shareFetchConfig, shareFetchBuffer,
-                    backgroundEventHandler, metricsManager, retryBackoffMs, 
1000);
+                acknowledgementEventHandler, metricsManager, retryBackoffMs, 
1000);
             this.shareFetchCollector = fetchCollector;
             onMemberEpochUpdated(Optional.empty(), 
Uuid.randomUuid().toString());
         }
@@ -2955,23 +2954,20 @@ public class ShareConsumeRequestManagerTest {
         }
     }
 
-    private static class TestableBackgroundEventHandler extends 
BackgroundEventHandler {
+    private static class TestableShareAcknowledgementEventHandler extends 
ShareAcknowledgementEventHandler {
         List<Map<TopicIdPartition, Acknowledgements>> 
completedAcknowledgements;
         Set<Long> renewedRecords;
 
-        public TestableBackgroundEventHandler(Time time, 
List<Map<TopicIdPartition, Acknowledgements>> completedAcknowledgements, 
Set<Long> renewedRecords) {
-            super(new LinkedBlockingQueue<>(), time, 
mock(AsyncConsumerMetrics.class));
+        public 
TestableShareAcknowledgementEventHandler(List<Map<TopicIdPartition, 
Acknowledgements>> completedAcknowledgements, Set<Long> renewedRecords) {
+            super(new LinkedBlockingQueue<>());
             this.completedAcknowledgements = completedAcknowledgements;
             this.renewedRecords = renewedRecords;
         }
 
-        public void add(BackgroundEvent event) {
-            if (event.type() == 
BackgroundEvent.Type.SHARE_ACKNOWLEDGEMENT_COMMIT_CALLBACK) {
-                ShareAcknowledgementCommitCallbackEvent 
shareAcknowledgementCommitCallbackEvent = 
(ShareAcknowledgementCommitCallbackEvent) event;
-                
completedAcknowledgements.add(shareAcknowledgementCommitCallbackEvent.acknowledgementsMap());
-            } else if (event.type() == 
BackgroundEvent.Type.SHARE_RENEW_ACKNOWLEDGEMENTS_COMPLETE) {
-                ShareRenewAcknowledgementsCompleteEvent 
shareRenewAcknowledgementsCompleteEvent = 
(ShareRenewAcknowledgementsCompleteEvent) event;
-                
shareRenewAcknowledgementsCompleteEvent.acknowledgementsMap().values().forEach(acks
 ->
+        public void add(ShareAcknowledgementEvent event) {
+            completedAcknowledgements.add(event.acknowledgementsMap());
+            if (event.checkForRenewAcknowledgements()) {
+                event.acknowledgementsMap().values().forEach(acks ->
                     acks.getAcknowledgementsTypeMap().forEach((offset, 
ackType) -> renewedRecords.add(offset)));
             }
         }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
index 6e819a6888a..489b0d78200 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
@@ -27,11 +27,10 @@ import 
org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper
 import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeAsyncEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeOnCloseEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackRegistrationEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementEvent;
 import org.apache.kafka.clients.consumer.internals.events.ShareFetchEvent;
 import org.apache.kafka.clients.consumer.internals.events.SharePollEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.ShareRenewAcknowledgementsCompleteEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ShareSubscriptionChangeEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ShareUnsubscribeEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.StopFindCoordinatorOnCloseEvent;
@@ -41,6 +40,7 @@ import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.InvalidRecordStateException;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
@@ -107,6 +107,7 @@ public class ShareConsumerImplTest {
     private final ShareFetchCollector<String, String> fetchCollector = 
mock(ShareFetchCollector.class);
     private final ShareConsumerMetadata metadata = 
mock(ShareConsumerMetadata.class);
     private final ApplicationEventHandler applicationEventHandler = 
mock(ApplicationEventHandler.class);
+    private final LinkedBlockingQueue<ShareAcknowledgementEvent> 
acknowledgementEventQueue = new LinkedBlockingQueue<>();
     private final LinkedBlockingQueue<BackgroundEvent> backgroundEventQueue = 
new LinkedBlockingQueue<>();
     private final CompletableEventReaper backgroundEventReaper = 
mock(CompletableEventReaper.class);
 
@@ -147,6 +148,7 @@ public class ShareConsumerImplTest {
                 (a, b, c, d, e, f, g, h) -> applicationEventHandler,
                 a -> backgroundEventReaper,
                 (a, b, c, d, e) -> fetchCollector,
+                acknowledgementEventQueue,
                 backgroundEventQueue
         );
     }
@@ -181,6 +183,7 @@ public class ShareConsumerImplTest {
                 fetchCollector,
                 time,
                 applicationEventHandler,
+                acknowledgementEventQueue,
                 backgroundEventQueue,
                 backgroundEventReaper,
                 new Metrics(),
@@ -517,8 +520,8 @@ public class ShareConsumerImplTest {
         Acknowledgements acks = Acknowledgements.empty();
         acks.add(0, AcknowledgeType.RENEW);
         acks.complete(null);
-        ShareRenewAcknowledgementsCompleteEvent e = new 
ShareRenewAcknowledgementsCompleteEvent(Map.of(tip, acks));
-        backgroundEventQueue.add(e);
+        ShareAcknowledgementEvent e = new 
ShareAcknowledgementEvent(Map.of(tip, acks), true);
+        acknowledgementEventQueue.add(e);
 
         records = consumer.poll(Duration.ofMillis(100));
         assertEquals(1, records.count(), "Should have received 1 record");
@@ -898,13 +901,13 @@ public class ShareConsumerImplTest {
         Metrics metrics = consumer.metricsRegistry();
         AsyncConsumerMetrics asyncConsumerMetrics = 
consumer.asyncConsumerMetrics();
 
-        ShareAcknowledgementCommitCallbackEvent event = new 
ShareAcknowledgementCommitCallbackEvent(Map.of());
+        ErrorEvent event = new ErrorEvent(new InvalidRecordStateException("The 
record is in the wrong state"));
         backgroundEventQueue.add(event);
         asyncConsumerMetrics.recordBackgroundEventQueueSize(1);
 
         assertEquals(1, (double) 
metrics.metric(metrics.metricName("background-event-queue-size", 
CONSUMER_SHARE_METRIC_GROUP)).metricValue());
 
-        consumer.processBackgroundEvents();
+        assertThrows(InvalidRecordStateException.class, () -> 
consumer.processBackgroundEvents());
         assertEquals(0, (double) 
metrics.metric(metrics.metricName("background-event-queue-size", 
CONSUMER_SHARE_METRIC_GROUP)).metricValue());
     }
 

Reply via email to