This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ebae768bd89 KAFKA-18193 Refactor Kafka Streams CloseOptions to Fluent 
API Style (#19955)
ebae768bd89 is described below

commit ebae768bd89f8ee4f34a8c5fb05229e488fbdc82
Author: Ken Huang <[email protected]>
AuthorDate: Tue Oct 7 20:50:18 2025 +0800

    KAFKA-18193 Refactor Kafka Streams CloseOptions to Fluent API Style (#19955)
    
    In Kafka Streams, configuration classes typically follow a fluent API
    pattern to ensure a consistent and intuitive developer experience.
    However, the current implementation of
    `org.apache.kafka.streams.KafkaStreams$CloseOptions` deviates from this
    convention by exposing a public constructor, breaking the uniformity
    expected across the API.
    
    To address this inconsistency, we propose introducing a new
    `CloseOptions` class that adheres to the fluent API style, replacing the
    existing implementation. The new class will retain the existing
    `timeout(Duration)` and `leaveGroup(boolean)` methods but will enforce
    fluent instantiation and configuration. Given the design shift, we will
    not maintain backward compatibility with the current class.
    
    This change aligns with the goal of standardizing configuration objects
    across Kafka Streams, offering developers a more cohesive and
    predictable API.
    
    Reviewers: Bill Bejeck<[email protected]>
---
 docs/upgrade.html                                  |   7 +
 .../KafkaStreamsCloseOptionsIntegrationTest.java   |   4 +-
 .../org/apache/kafka/streams/CloseOptions.java     |  98 ++++++++++
 .../org/apache/kafka/streams/KafkaStreams.java     |  60 +++++--
 .../streams/internals/CloseOptionsInternal.java    |  37 ++++
 .../streams/processor/internals/StreamThread.java  |  32 ++--
 .../org/apache/kafka/streams/KafkaStreamsTest.java |  49 +++--
 .../processor/internals/StreamThreadTest.java      | 197 +++++++++++----------
 .../streams/DeleteStreamsGroupOffsetTest.java      |   6 +-
 .../tools/streams/DeleteStreamsGroupTest.java      |   6 +-
 10 files changed, 334 insertions(+), 162 deletions(-)

diff --git a/docs/upgrade.html b/docs/upgrade.html
index 4affedf60c7..b8a0614456c 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -189,6 +189,13 @@
         A new metric <code>AvgIdleRatio</code> has been added to the 
<code>ControllerEventManager</code> group. This metric measures the average 
idle ratio of the controller event queue thread,
         providing visibility into how much time the controller spends waiting 
for events versus processing them. The metric value ranges from 0.0 (always 
busy) to 1.0 (always idle).
     </li>
+    <li>
+        Deprecated 
<code>org.apache.kafka.streams.KafkaStreams$CloseOptions</code> and its related 
methods, such as
+        
<code>KafkaStreams#close(org.apache.kafka.streams.KafkaStreams$CloseOptions)</code>.
+        As a replacement, please use 
<code>org.apache.kafka.streams.CloseOptions</code> and
+        <code>KafkaStreams#close(org.apache.kafka.streams.CloseOptions)</code>.
+        For further details, please refer to <a 
href="https://cwiki.apache.org/confluence/x/QAq9F";>KIP-1153</a>.
+    </li>
 </ul>
 
 <h4><a id="upgrade_4_1_0" href="#upgrade_4_1_0">Upgrading to 4.1.0</a></h4>
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java
index 97fc2b40ba8..5f690a794f2 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java
@@ -28,8 +28,8 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
+import org.apache.kafka.streams.CloseOptions;
 import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KafkaStreams.CloseOptions;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
@@ -159,7 +159,7 @@ public class KafkaStreamsCloseOptionsIntegrationTest {
         IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
         
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, 
OUTPUT_TOPIC, 10);
 
-        streams.close(new 
CloseOptions().leaveGroup(true).timeout(Duration.ofSeconds(30)));
+        
streams.close(CloseOptions.groupMembershipOperation(CloseOptions.GroupMembershipOperation.LEAVE_GROUP).withTimeout(Duration.ofSeconds(30)));
         waitForEmptyConsumerGroup(adminClient, 
streamsConfig.getProperty(StreamsConfig.APPLICATION_ID_CONFIG), 0);
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/CloseOptions.java 
b/streams/src/main/java/org/apache/kafka/streams/CloseOptions.java
new file mode 100644
index 00000000000..25be8530a4c
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/CloseOptions.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import java.time.Duration;
+import java.util.Objects;
+import java.util.Optional;
+
+public class CloseOptions {
+    /**
+     * Enum to specify the group membership operation upon closing the Kafka 
Streams application.
+     *
+     * <ul>
+     *   <li><b>{@code LEAVE_GROUP}</b>: means the consumer leave the 
group.</li>
+     *   <li><b>{@code REMAIN_IN_GROUP}</b>: means the consumer will remain in 
the group.</li>
+     * </ul>
+     */
+    public enum GroupMembershipOperation {
+        LEAVE_GROUP,
+        REMAIN_IN_GROUP
+    }
+
+    /**
+     * Specifies the group membership operation upon shutdown.
+     * By default, {@code GroupMembershipOperation.REMAIN_IN_GROUP} will be 
applied, which follows the KafkaStreams default behavior.
+     */
+    protected GroupMembershipOperation operation = 
GroupMembershipOperation.REMAIN_IN_GROUP;
+
+    /**
+     * Specifies the maximum amount of time to wait for the close process to 
complete.
+     * This allows users to define a custom timeout for gracefully stopping 
the KafkaStreams.
+     */
+    protected Optional<Duration> timeout = 
Optional.of(Duration.ofMillis(Long.MAX_VALUE));
+
+    private CloseOptions() {
+    }
+
+    protected CloseOptions(final CloseOptions closeOptions) {
+        this.operation = closeOptions.operation;
+        this.timeout = closeOptions.timeout;
+    }
+
+    /**
+     * Static method to create a {@code CloseOptions} with a custom timeout.
+     *
+     * @param timeout the maximum time to wait for the KafkaStreams to close.
+     * @return a new {@code CloseOptions} instance with the specified timeout.
+     */
+    public static CloseOptions timeout(final Duration timeout) {
+        return new CloseOptions().withTimeout(timeout);
+    }
+
+    /**
+     * Static method to create a {@code CloseOptions} with a specified group 
membership operation.
+     *
+     * @param operation the group membership operation to apply. Must be one 
of {@code LEAVE_GROUP}, {@code REMAIN_IN_GROUP}.
+     * @return a new {@code CloseOptions} instance with the specified group 
membership operation.
+     */
+    public static CloseOptions groupMembershipOperation(final 
GroupMembershipOperation operation) {
+        return new CloseOptions().withGroupMembershipOperation(operation);
+    }
+
+    /**
+     * Fluent method to set the timeout for the close process.
+     *
+     * @param timeout the maximum time to wait for the KafkaStreams to close. 
If {@code null}, the default timeout will be used.
+     * @return this {@code CloseOptions} instance.
+     */
+    public CloseOptions withTimeout(final Duration timeout) {
+        this.timeout = Optional.ofNullable(timeout);
+        return this;
+    }
+
+    /**
+     * Fluent method to set the group membership operation upon shutdown.
+     *
+     * @param operation the group membership operation to apply. Must be one 
of {@code LEAVE_GROUP}, {@code REMAIN_IN_GROUP}.
+     * @return this {@code CloseOptions} instance.
+     */
+    public CloseOptions withGroupMembershipOperation(final 
GroupMembershipOperation operation) {
+        this.operation = Objects.requireNonNull(operation, "operation should 
not be null");
+        return this;
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index dbf101fd9af..999befa3495 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -49,6 +49,7 @@ import 
org.apache.kafka.streams.errors.StreamsStoppedException;
 import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
 import org.apache.kafka.streams.errors.UnknownStateStoreException;
 import org.apache.kafka.streams.internals.ClientInstanceIdsImpl;
+import org.apache.kafka.streams.internals.CloseOptionsInternal;
 import org.apache.kafka.streams.internals.metrics.ClientMetrics;
 import 
org.apache.kafka.streams.internals.metrics.StreamsClientMetricsDelegatingReporter;
 import org.apache.kafka.streams.processor.StandbyUpdateListener;
@@ -488,7 +489,7 @@ public class KafkaStreams implements AutoCloseable {
             closeToError();
         }
         final StreamThread deadThread = (StreamThread) Thread.currentThread();
-        deadThread.shutdown(false);
+        
deadThread.shutdown(org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
         addStreamThread();
         if (throwable instanceof RuntimeException) {
             throw (RuntimeException) throwable;
@@ -765,7 +766,7 @@ public class KafkaStreams implements AutoCloseable {
 
         @Override
         public void onUpdateStart(final TopicPartition topicPartition,
-                          final String storeName, 
+                          final String storeName,
                           final long startingOffset) {
             if (userStandbyListener != null) {
                 try {
@@ -1136,7 +1137,7 @@ public class KafkaStreams implements AutoCloseable {
                     return Optional.of(streamThread.getName());
                 } else {
                     log.warn("Terminating the new thread because the Kafka 
Streams client is in state {}", state);
-                    streamThread.shutdown(true);
+                    
streamThread.shutdown(org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
                     threads.remove(streamThread);
                     final long cacheSizePerThread = 
cacheSizePerThread(numLiveStreamThreads());
                     log.info("Resizing thread cache due to terminating added 
thread, new cache size per thread is {}", cacheSizePerThread);
@@ -1200,7 +1201,7 @@ public class KafkaStreams implements AutoCloseable {
                     final boolean callingThreadIsNotCurrentStreamThread = 
!streamThread.getName().equals(Thread.currentThread().getName());
                     if (streamThread.isThreadAlive() && 
(callingThreadIsNotCurrentStreamThread || numLiveStreamThreads() == 1)) {
                         log.info("Removing StreamThread {}", 
streamThread.getName());
-                        streamThread.shutdown(true);
+                        
streamThread.shutdown(org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
                         if (callingThreadIsNotCurrentStreamThread) {
                             final long remainingTimeMs = timeoutMs - 
(time.milliseconds() - startMs);
                             if (remainingTimeMs <= 0 || 
!streamThread.waitOnThreadState(StreamThread.State.DEAD, remainingTimeMs)) {
@@ -1418,15 +1419,18 @@ public class KafkaStreams implements AutoCloseable {
     /**
      * Class that handles options passed in case of {@code KafkaStreams} 
instance scale down
      */
+    @Deprecated(since = "4.2")
     public static class CloseOptions {
         private Duration timeout = Duration.ofMillis(Long.MAX_VALUE);
         private boolean leaveGroup = false;
 
+        @Deprecated(since = "4.2")
         public CloseOptions timeout(final Duration timeout) {
             this.timeout = timeout;
             return this;
         }
 
+        @Deprecated(since = "4.2")
         public CloseOptions leaveGroup(final boolean leaveGroup) {
             this.leaveGroup = leaveGroup;
             return this;
@@ -1438,10 +1442,14 @@ public class KafkaStreams implements AutoCloseable {
      * This will block until all threads have stopped.
      */
     public void close() {
-        close(Optional.empty(), false);
+        close(Optional.empty(), 
org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
     }
 
-    private Thread shutdownHelper(final boolean error, final long timeoutMs, 
final boolean leaveGroup) {
+    private Thread shutdownHelper(
+        final boolean error,
+        final long timeoutMs,
+        final org.apache.kafka.streams.CloseOptions.GroupMembershipOperation 
operation
+    ) {
         stateDirCleaner.shutdownNow();
         if (rocksDBMetricsRecordingService != null) {
             rocksDBMetricsRecordingService.shutdownNow();
@@ -1453,7 +1461,9 @@ public class KafkaStreams implements AutoCloseable {
         return new Thread(() -> {
             // notify all the threads to stop; avoid deadlocks by stopping any
             // further state reports from the thread since we're shutting down
-            int numStreamThreads = processStreamThread(streamThread -> 
streamThread.shutdown(leaveGroup));
+            int numStreamThreads = processStreamThread(
+                streamThread -> streamThread.shutdown(operation)
+            );
 
             log.info("Shutting down {} stream threads", numStreamThreads);
 
@@ -1513,7 +1523,7 @@ public class KafkaStreams implements AutoCloseable {
         }, clientId + "-CloseThread");
     }
 
-    private boolean close(final Optional<Long> timeout, final boolean 
leaveGroup) {
+    private boolean close(final Optional<Long> timeout, final 
org.apache.kafka.streams.CloseOptions.GroupMembershipOperation operation) {
         final long timeoutMs;
         if (timeout.isPresent()) {
             timeoutMs = timeout.get();
@@ -1544,7 +1554,7 @@ public class KafkaStreams implements AutoCloseable {
                 + "PENDING_SHUTDOWN, PENDING_ERROR, ERROR, or NOT_RUNNING");
         }
 
-        final Thread shutdownThread = shutdownHelper(false, timeoutMs, 
leaveGroup);
+        final Thread shutdownThread = shutdownHelper(false, timeoutMs, 
operation);
 
         shutdownThread.setDaemon(true);
         shutdownThread.start();
@@ -1562,7 +1572,7 @@ public class KafkaStreams implements AutoCloseable {
         if (!setState(State.PENDING_ERROR)) {
             log.info("Skipping shutdown since we are already in {}", state());
         } else {
-            final Thread shutdownThread = shutdownHelper(true, -1, false);
+            final Thread shutdownThread = shutdownHelper(true, -1, 
org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
 
             shutdownThread.setDaemon(true);
             shutdownThread.start();
@@ -1588,12 +1598,13 @@ public class KafkaStreams implements AutoCloseable {
             throw new IllegalArgumentException("Timeout can't be negative.");
         }
 
-        return close(Optional.of(timeoutMs), false);
+        return close(Optional.of(timeoutMs), 
org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
     }
 
     /**
      * Shutdown this {@code KafkaStreams} by signaling all the threads to 
stop, and then wait up to the timeout for the
      * threads to join.
+     * This method is deprecated and replaced by {@link 
#close(org.apache.kafka.streams.CloseOptions)}.
      * @param options  contains timeout to specify how long to wait for the 
threads to shut down, and a flag leaveGroup to
      *                 trigger consumer leave call
      * @return {@code true} if all threads were successfully 
stopped&mdash;{@code false} if the timeout was reached
@@ -1601,15 +1612,36 @@ public class KafkaStreams implements AutoCloseable {
      * Note that this method must not be called in the {@link 
StateListener#onChange(KafkaStreams.State, KafkaStreams.State)} callback of 
{@link StateListener}.
      * @throws IllegalArgumentException if {@code timeout} can't be 
represented as {@code long milliseconds}
      */
+    @Deprecated(since = "4.2")
     public synchronized boolean close(final CloseOptions options) throws 
IllegalArgumentException {
+        final org.apache.kafka.streams.CloseOptions closeOptions = 
org.apache.kafka.streams.CloseOptions.timeout(options.timeout)
+                .withGroupMembershipOperation(options.leaveGroup ?
+                        
org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.LEAVE_GROUP :
+                        
org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
+        return close(closeOptions);
+    }
+
+    /**
+     * Shutdown this {@code KafkaStreams} by signaling all the threads to 
stop, and then wait up to the timeout for the
+     * threads to join.
+     * @param options  contains timeout to specify how long to wait for the 
threads to shut down,
+     *                 and a {@link 
org.apache.kafka.streams.CloseOptions.GroupMembershipOperation}
+     *                 to trigger consumer leave call or remain in the group
+     * @return {@code true} if all threads were successfully 
stopped&mdash;{@code false} if the timeout was reached
+     * before all threads stopped
+     * Note that this method must not be called in the {@link 
StateListener#onChange(KafkaStreams.State, KafkaStreams.State)} callback of 
{@link StateListener}.
+     * @throws IllegalArgumentException if {@code timeout} can't be 
represented as {@code long milliseconds}
+     */
+    public synchronized boolean close(final 
org.apache.kafka.streams.CloseOptions options) throws IllegalArgumentException {
         Objects.requireNonNull(options, "options cannot be null");
-        final String msgPrefix = 
prepareMillisCheckFailMsgPrefix(options.timeout, "timeout");
-        final long timeoutMs = validateMillisecondDuration(options.timeout, 
msgPrefix);
+        final CloseOptionsInternal optionsInternal = new 
CloseOptionsInternal(options);
+        final String msgPrefix = 
prepareMillisCheckFailMsgPrefix(optionsInternal.timeout(), "timeout");
+        final long timeoutMs = 
validateMillisecondDuration(optionsInternal.timeout().get(), msgPrefix);
         if (timeoutMs < 0) {
             throw new IllegalArgumentException("Timeout can't be negative.");
         }
 
-        return close(Optional.of(timeoutMs), options.leaveGroup);
+        return close(Optional.of(timeoutMs), optionsInternal.operation());
     }
 
     /**
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/internals/CloseOptionsInternal.java
 
b/streams/src/main/java/org/apache/kafka/streams/internals/CloseOptionsInternal.java
new file mode 100644
index 00000000000..0211c6cd431
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/internals/CloseOptionsInternal.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.internals;
+
+import org.apache.kafka.streams.CloseOptions;
+
+import java.time.Duration;
+import java.util.Optional;
+
+public class CloseOptionsInternal extends CloseOptions {
+
+    public CloseOptionsInternal(final CloseOptions options) {
+        super(options);
+    }
+
+    public GroupMembershipOperation operation() {
+        return operation;
+    }
+
+    public Optional<Duration> timeout() {
+        return timeout;
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 4c16577972b..623e4b6c45f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -91,9 +91,9 @@ import java.util.Optional;
 import java.util.Queue;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 
@@ -367,7 +367,8 @@ public class StreamThread extends Thread implements 
ProcessingThread {
 
     // These are used to signal from outside the stream thread, but the 
variables themselves are internal to the thread
     private final AtomicLong cacheResizeSize = new AtomicLong(-1L);
-    private final AtomicBoolean leaveGroupRequested = new AtomicBoolean(false);
+    private final 
AtomicReference<org.apache.kafka.streams.CloseOptions.GroupMembershipOperation> 
leaveGroupRequested =
+        new 
AtomicReference<>(org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
     private final AtomicLong lastShutdownWarningTimestamp = new AtomicLong(0L);
     private final boolean eosEnabled;
     private final boolean stateUpdaterEnabled;
@@ -898,7 +899,7 @@ public class StreamThread extends Thread implements 
ProcessingThread {
             cleanRun = runLoop();
         } catch (final Throwable e) {
             failedStreamThreadSensor.record();
-            leaveGroupRequested.set(true);
+            
leaveGroupRequested.set(org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
             streamsUncaughtExceptionHandler.accept(e, false);
             // Note: the above call currently rethrows the exception, so 
nothing below this line will be executed
         } finally {
@@ -1547,7 +1548,7 @@ public class StreamThread extends Thread implements 
ProcessingThread {
         if (streamsRebalanceData.isPresent()) {
             boolean hasMissingSourceTopics = false;
             String missingTopicsDetail = null;
-            
+
             for (final StreamsGroupHeartbeatResponseData.Status status : 
streamsRebalanceData.get().statuses()) {
                 if (status.statusCode() == 
StreamsGroupHeartbeatResponse.Status.SHUTDOWN_APPLICATION.code()) {
                     shutdownErrorHook.run();
@@ -1560,7 +1561,7 @@ public class StreamThread extends Thread implements 
ProcessingThread {
                     throw new TopologyException(errorMsg);
                 }
             }
-            
+
             if (hasMissingSourceTopics) {
                 handleMissingSourceTopicsWithTimeout(missingTopicsDetail);
             } else {
@@ -1589,25 +1590,25 @@ public class StreamThread extends Thread implements 
ProcessingThread {
         // Start timeout tracking on first encounter with missing topics
         if (topicsReadyTimer == null) {
             topicsReadyTimer = time.timer(maxPollTimeMs);
-            log.info("Missing source topics detected: {}. Will wait up to {}ms 
before failing.", 
+            log.info("Missing source topics detected: {}. Will wait up to {}ms 
before failing.",
                 missingTopicsDetail, maxPollTimeMs);
         } else {
             topicsReadyTimer.update();
         }
-        
+
         if (topicsReadyTimer.isExpired()) {
             final long elapsedTime = topicsReadyTimer.elapsedMs();
-            final String errorMsg = String.format("Missing source topics: %s. 
Timeout exceeded after %dms.", 
+            final String errorMsg = String.format("Missing source topics: %s. 
Timeout exceeded after %dms.",
                 missingTopicsDetail, elapsedTime);
             log.error(errorMsg);
-            
+
             throw new MissingSourceTopicException(errorMsg);
         } else {
-            log.debug("Missing source topics: {}. Elapsed time: {}ms, timeout 
in: {}ms", 
+            log.debug("Missing source topics: {}. Elapsed time: {}ms, timeout 
in: {}ms",
                 missingTopicsDetail, topicsReadyTimer.elapsedMs(), 
topicsReadyTimer.remainingMs());
         }
     }
-    
+
 
     static Map<TopicPartition, PartitionInfo> getTopicPartitionInfo(final 
Map<HostInfo, Set<TopicPartition>> partitionsByHost) {
         final Map<TopicPartition, PartitionInfo> topicToPartitionInfo = new 
HashMap<>();
@@ -1879,12 +1880,12 @@ public class StreamThread extends Thread implements 
ProcessingThread {
      * Note that there is nothing to prevent this function from being called 
multiple times
      * (e.g., in testing), hence the state is set only the first time
      *
-     * @param leaveGroup this flag will control whether the consumer will 
leave the group on close or not
+     * @param operation the group membership operation to apply on shutdown. 
Must be one of LEAVE_GROUP or REMAIN_IN_GROUP.
      */
-    public void shutdown(final boolean leaveGroup) {
+    public void shutdown(final 
org.apache.kafka.streams.CloseOptions.GroupMembershipOperation operation) {
         log.info("Informed to shut down");
         final State oldState = setState(State.PENDING_SHUTDOWN);
-        leaveGroupRequested.set(leaveGroup);
+        leaveGroupRequested.set(operation);
         if (oldState == State.CREATED) {
             // The thread may not have been started. Take responsibility for 
shutting down
             completeShutdown(true);
@@ -1917,7 +1918,8 @@ public class StreamThread extends Thread implements 
ProcessingThread {
             log.error("Failed to close changelog reader due to the following 
error:", e);
         }
         try {
-            final GroupMembershipOperation membershipOperation = 
leaveGroupRequested.get() ? LEAVE_GROUP : REMAIN_IN_GROUP;
+            final GroupMembershipOperation membershipOperation =
+                leaveGroupRequested.get() == 
org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.LEAVE_GROUP ? 
LEAVE_GROUP : REMAIN_IN_GROUP;
             
mainConsumer.close(CloseOptions.groupMembershipOperation(membershipOperation));
         } catch (final Throwable e) {
             log.error("Failed to close consumer due to the following error:", 
e);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java 
b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index ab292e572e3..1ceebab1cd7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -19,7 +19,6 @@ package org.apache.kafka.streams;
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
 import org.apache.kafka.clients.admin.MockAdminClient;
-import org.apache.kafka.clients.consumer.CloseOptions;
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaFuture;
@@ -310,8 +309,12 @@ public class KafkaStreamsTest {
 
     private void prepareConsumer(final StreamThread thread, final 
AtomicReference<StreamThread.State> state) {
         doAnswer(invocation -> {
-            
supplier.consumer.close(CloseOptions.groupMembershipOperation(CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP));
-            
supplier.restoreConsumer.close(CloseOptions.groupMembershipOperation(CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP));
+            supplier.consumer.close(
+                
org.apache.kafka.clients.consumer.CloseOptions.groupMembershipOperation(org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP)
+            );
+            supplier.restoreConsumer.close(
+                
org.apache.kafka.clients.consumer.CloseOptions.groupMembershipOperation(org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP)
+            );
             for (final MockProducer<byte[], byte[]> producer : 
supplier.producers) {
                 producer.close();
             }
@@ -320,7 +323,7 @@ public class KafkaStreamsTest {
             threadStateListenerCapture.getValue().onChange(thread, 
StreamThread.State.PENDING_SHUTDOWN, StreamThread.State.RUNNING);
             threadStateListenerCapture.getValue().onChange(thread, 
StreamThread.State.DEAD, StreamThread.State.PENDING_SHUTDOWN);
             return null;
-        }).when(thread).shutdown(false);
+        
}).when(thread).shutdown(CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
     }
 
     private void prepareThreadLock(final StreamThread thread) {
@@ -571,7 +574,7 @@ public class KafkaStreamsTest {
 
             for (int i = 0; i < NUM_THREADS; i++) {
                 final StreamThread tmpThread = streams.threads.get(i);
-                tmpThread.shutdown(false);
+                
tmpThread.shutdown(CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
                 waitForCondition(() -> tmpThread.state() == 
StreamThread.State.DEAD,
                     "Thread never stopped.");
                 streams.threads.get(i).join();
@@ -790,7 +793,7 @@ public class KafkaStreamsTest {
         prepareThreadLock(streamThreadTwo);
         try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
             streams.start();
-            streamThreadOne.shutdown(true);
+            
streamThreadOne.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
             final Set<ThreadMetadata> threads = 
streams.metadataForLocalThreads();
             assertThat(threads.size(), equalTo(1));
             assertThat(threads, hasItem(streamThreadTwo.threadMetadata()));
@@ -1016,9 +1019,8 @@ public class KafkaStreamsTest {
                 () -> streams.state() == KafkaStreams.State.RUNNING,
                 "Streams never started.");
 
-            final KafkaStreams.CloseOptions closeOptions = new 
KafkaStreams.CloseOptions();
-            closeOptions.timeout(Duration.ZERO);
-            closeOptions.leaveGroup(true);
+            final CloseOptions closeOptions = 
CloseOptions.timeout(Duration.ZERO)
+                    
.withGroupMembershipOperation(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
 
             streams.close(closeOptions);
             assertThat(streams.state() == State.PENDING_SHUTDOWN, 
equalTo(true));
@@ -1041,8 +1043,7 @@ public class KafkaStreamsTest {
                 () -> streams.state() == KafkaStreams.State.RUNNING,
                 "Streams never started.");
 
-            final KafkaStreams.CloseOptions closeOptions = new 
KafkaStreams.CloseOptions();
-            closeOptions.timeout(Duration.ZERO);
+            final CloseOptions closeOptions = 
CloseOptions.timeout(Duration.ZERO);
 
             streams.close(closeOptions);
             assertThat(streams.state() == State.PENDING_SHUTDOWN, 
equalTo(true));
@@ -1229,8 +1230,7 @@ public class KafkaStreamsTest {
         prepareStreamThread(streamThreadTwo, 2);
         prepareTerminableThread(streamThreadOne);
 
-        final KafkaStreams.CloseOptions closeOptions = new 
KafkaStreams.CloseOptions();
-        closeOptions.timeout(Duration.ofMillis(10L));
+        final CloseOptions closeOptions = 
CloseOptions.timeout(Duration.ofMillis(10L));
         try (final KafkaStreams streams = new 
KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props, 
supplier)) {
             assertFalse(streams.close(closeOptions));
         }
@@ -1243,8 +1243,7 @@ public class KafkaStreamsTest {
         prepareStreamThread(streamThreadTwo, 2);
         prepareTerminableThread(streamThreadOne);
 
-        final KafkaStreams.CloseOptions closeOptions = new 
KafkaStreams.CloseOptions();
-        closeOptions.timeout(Duration.ofMillis(-1L));
+        final CloseOptions closeOptions = 
CloseOptions.timeout(Duration.ofMillis(-1L));
         try (final KafkaStreams streams = new 
KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props, 
supplier, time)) {
             assertThrows(IllegalArgumentException.class, () -> 
streams.close(closeOptions));
         }
@@ -1257,8 +1256,7 @@ public class KafkaStreamsTest {
         prepareStreamThread(streamThreadTwo, 2);
         prepareTerminableThread(streamThreadOne);
 
-        final KafkaStreams.CloseOptions closeOptions = new 
KafkaStreams.CloseOptions();
-        closeOptions.timeout(Duration.ZERO);
+        final CloseOptions closeOptions = CloseOptions.timeout(Duration.ZERO);
         try (final KafkaStreams streams = new 
KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props, 
supplier)) {
             assertFalse(streams.close(closeOptions));
         }
@@ -1275,9 +1273,8 @@ public class KafkaStreamsTest {
 
         when(mockClientSupplier.getAdmin(any())).thenReturn(adminClient);
 
-        final KafkaStreams.CloseOptions closeOptions = new 
KafkaStreams.CloseOptions();
-        closeOptions.timeout(Duration.ofMillis(10L));
-        closeOptions.leaveGroup(true);
+        final CloseOptions closeOptions = 
CloseOptions.timeout(Duration.ofMillis(10L))
+                
.withGroupMembershipOperation(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
         try (final KafkaStreams streams = new 
KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props, 
mockClientSupplier)) {
             assertFalse(streams.close(closeOptions));
         }
@@ -1293,9 +1290,8 @@ public class KafkaStreamsTest {
         final MockClientSupplier mockClientSupplier = 
spy(MockClientSupplier.class);
         when(mockClientSupplier.getAdmin(any())).thenReturn(adminClient);
 
-        final KafkaStreams.CloseOptions closeOptions = new 
KafkaStreams.CloseOptions();
-        closeOptions.timeout(Duration.ofMillis(-1L));
-        closeOptions.leaveGroup(true);
+        final CloseOptions closeOptions = 
CloseOptions.timeout(Duration.ofMillis(-1L))
+                
.withGroupMembershipOperation(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
         try (final KafkaStreams streams = new 
KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props, 
mockClientSupplier, time)) {
             assertThrows(IllegalArgumentException.class, () -> 
streams.close(closeOptions));
         }
@@ -1312,9 +1308,8 @@ public class KafkaStreamsTest {
 
         when(mockClientSupplier.getAdmin(any())).thenReturn(adminClient);
 
-        final KafkaStreams.CloseOptions closeOptions = new 
KafkaStreams.CloseOptions();
-        closeOptions.timeout(Duration.ZERO);
-        closeOptions.leaveGroup(true);
+        final CloseOptions closeOptions = CloseOptions.timeout(Duration.ZERO)
+                
.withGroupMembershipOperation(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
         try (final KafkaStreams streams = new 
KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props, 
mockClientSupplier)) {
             assertFalse(streams.close(closeOptions));
         }
@@ -1720,7 +1715,7 @@ public class KafkaStreamsTest {
         producerFuture.complete(producerInstanceId);
         final Uuid adminInstanceId = Uuid.randomUuid();
         adminClient.setClientInstanceId(adminInstanceId);
-        
+
         final Map<String, KafkaFuture<Uuid>> expectedClientIds = 
Map.of("main-consumer", consumerFuture, "some-thread-producer", producerFuture);
         
when(streamThreadOne.clientInstanceIds(any())).thenReturn(expectedClientIds);
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 44c4ea9a869..bb9a871bab2 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -60,6 +60,7 @@ import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.CloseOptions;
 import org.apache.kafka.streams.GroupProtocol;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsConfig.InternalConfig;
@@ -247,7 +248,7 @@ public class StreamThreadTest {
             if (thread.state() != State.CREATED) {
                 thread.taskManager().shutdown(false);
             }
-            thread.shutdown(true);
+            thread.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
             thread = null;
         }
         final Set<Thread> t = 
Collections.unmodifiableSet(Thread.getAllStackTraces().keySet());
@@ -409,12 +410,12 @@ public class StreamThreadTest {
         assertEquals(4, stateListener.numChanges);
         assertEquals(StreamThread.State.PARTITIONS_ASSIGNED, 
stateListener.oldState);
 
-        thread.shutdown(true);
+        thread.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
         assertSame(StreamThread.State.PENDING_SHUTDOWN, thread.state());
     }
 
     @ParameterizedTest
-    @MethodSource("data")    
+    @MethodSource("data")
     public void shouldChangeStateAtStartClose(final boolean 
stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception {
         thread = createStreamThread(CLIENT_ID, new MockTime(1), 
stateUpdaterEnabled, processingThreadsEnabled);
 
@@ -427,18 +428,18 @@ public class StreamThreadTest {
             10 * 1000,
             "Thread never started.");
 
-        thread.shutdown(true);
+        thread.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
         TestUtils.waitForCondition(
             () -> thread.state() == StreamThread.State.DEAD,
             10 * 1000,
             "Thread never shut down.");
 
-        thread.shutdown(true);
-        assertEquals(thread.state(), StreamThread.State.DEAD);
+        thread.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
+        assertEquals(State.DEAD, thread.state());
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void shouldCreateMetricsAtStartup(final boolean 
stateUpdaterEnabled, final boolean processingThreadsEnabled) {
         thread = createStreamThread(CLIENT_ID, new MockTime(1), 
stateUpdaterEnabled, processingThreadsEnabled);
         final String defaultGroupName = "stream-thread-metrics";
@@ -538,7 +539,7 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void shouldNotCommitBeforeTheCommitInterval(final boolean 
stateUpdaterEnabled, final boolean processingThreadsEnabled) {
         final long commitInterval = 1000L;
         final Properties props = configProps(false, stateUpdaterEnabled, 
processingThreadsEnabled);
@@ -565,7 +566,7 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void shouldNotPurgeBeforeThePurgeInterval(final boolean 
stateUpdaterEnabled, final boolean processingThreadsEnabled) {
         final long commitInterval = 1000L;
         final long purgeInterval = 2000L;
@@ -593,7 +594,7 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void shouldAlsoPurgeWhenNothingGetsCommitted(final boolean 
stateUpdaterEnabled, final boolean processingThreadsEnabled) {
         final long purgeInterval = 1000L;
         final Properties props = configProps(false, stateUpdaterEnabled, 
processingThreadsEnabled);
@@ -658,7 +659,7 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void shouldNotProcessWhenPartitionRevoked(final boolean 
stateUpdaterEnabled, final boolean processingThreadsEnabled) {
         assumeFalse(processingThreadsEnabled);
 
@@ -682,7 +683,7 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void shouldProcessWhenRunning(final boolean stateUpdaterEnabled, 
final boolean processingThreadsEnabled) {
         assumeFalse(processingThreadsEnabled);
         final Properties props = configProps(false, stateUpdaterEnabled, 
processingThreadsEnabled);
@@ -707,7 +708,7 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void shouldProcessWhenPartitionAssigned(final boolean 
stateUpdaterEnabled, final boolean processingThreadsEnabled) {
         assumeTrue(stateUpdaterEnabled);
         assumeFalse(processingThreadsEnabled);
@@ -732,7 +733,7 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void shouldProcessWhenStarting(final boolean stateUpdaterEnabled, 
final boolean processingThreadsEnabled) {
         assumeTrue(stateUpdaterEnabled);
         assumeFalse(processingThreadsEnabled);
@@ -757,7 +758,7 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void 
shouldEnforceRebalanceWhenScheduledAndNotCurrentlyRebalancing(final boolean 
stateUpdaterEnabled, final boolean processingThreadsEnabled) throws 
InterruptedException {
         final Time mockTime = new MockTime(1);
         final StreamsConfig config = new StreamsConfig(configProps(false, 
stateUpdaterEnabled, processingThreadsEnabled));
@@ -812,7 +813,7 @@ public class StreamThreadTest {
             10 * 1000,
             "Thread never started.");
 
-        thread.shutdown(true);
+        thread.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
         TestUtils.waitForCondition(
             () -> thread.state() == StreamThread.State.DEAD,
             10 * 1000,
@@ -822,7 +823,7 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void shouldNotEnforceRebalanceWhenCurrentlyRebalancing(final 
boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws 
InterruptedException {
         final Time mockTime = new MockTime(1);
         final StreamsConfig config = new StreamsConfig(configProps(false, 
stateUpdaterEnabled, processingThreadsEnabled));
@@ -880,7 +881,7 @@ public class StreamThreadTest {
             () -> { }
         );
 
-        thread.shutdown(true);
+        thread.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
 
         // Validate that the scheduled rebalance wasn't reset then set to 
MAX_VALUE so we
         // don't trigger one before we can shut down, since the rebalance must 
be ended
@@ -918,7 +919,7 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void 
shouldRespectNumIterationsInMainLoopWithoutProcessingThreads(final boolean 
stateUpdaterEnabled, final boolean processingThreadsEnabled) {
         // With processing threads, there is no guarantee how many iterations 
will be performed
         assumeFalse(processingThreadsEnabled);
@@ -1047,7 +1048,7 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void shouldNotCauseExceptionIfNothingCommitted(final boolean 
stateUpdaterEnabled, final boolean processingThreadsEnabled) {
         final long commitInterval = 1000L;
         final Properties props = configProps(false, stateUpdaterEnabled, 
processingThreadsEnabled);
@@ -1076,7 +1077,7 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void shouldCommitAfterCommitInterval(final boolean 
stateUpdaterEnabled, final boolean processingThreadsEnabled) {
         final long commitInterval = 100L;
         final long commitLatency = 10L;
@@ -1137,7 +1138,7 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void shouldPurgeAfterPurgeInterval(final boolean 
stateUpdaterEnabled, final boolean processingThreadsEnabled) {
         final long commitInterval = 100L;
         final long purgeInterval = 200L;
@@ -1170,7 +1171,7 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void shouldRecordCommitLatency(final boolean stateUpdaterEnabled, 
final boolean processingThreadsEnabled) {
         final ConsumerGroupMetadata consumerGroupMetadata = 
mock(ConsumerGroupMetadata.class);
         when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata);
@@ -1279,7 +1280,7 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void 
shouldInjectSharedProducerForAllTasksUsingClientSupplierOnCreateIfEosDisabled(final
 boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
         internalTopologyBuilder.addSource(null, "source1", null, null, null, 
topic1);
         internalStreamsBuilder.buildAndOptimizeTopology();
@@ -1319,7 +1320,7 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void 
shouldInjectProducerPerThreadUsingClientSupplierOnCreateIfEosV2Enabled(final 
boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
         internalTopologyBuilder.addSource(null, "source1", null, null, null, 
topic1);
 
@@ -1357,7 +1358,7 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void shouldOnlyCompleteShutdownAfterRebalanceNotInProgress(final 
boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws 
InterruptedException {
         // The state updater is disabled for this test because this test 
relies on the fact the mainConsumer.resume()
         // is not called. This is not true when the state updater is enabled 
which leads to
@@ -1390,7 +1391,7 @@ public class StreamThreadTest {
                 10 * 1000,
                 "Thread never started.");
 
-        thread.shutdown(true);
+        thread.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
 
         // even if thread is no longer running, it should still be polling
         // as long as the rebalance is still ongoing
@@ -1411,7 +1412,7 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void shouldShutdownTaskManagerOnClose(final boolean 
stateUpdaterEnabled, final boolean processingThreadsEnabled) {
         final ConsumerGroupMetadata consumerGroupMetadata = 
mock(ConsumerGroupMetadata.class);
         when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata);
@@ -1426,7 +1427,7 @@ public class StreamThreadTest {
         thread.setStateListener(
             (t, newState, oldState) -> {
                 if (oldState == StreamThread.State.CREATED && newState == 
StreamThread.State.STARTING) {
-                    thread.shutdown(true);
+                    
thread.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
                 }
             });
         thread.run();
@@ -1435,7 +1436,7 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void shouldNotReturnDataAfterTaskMigrated(final boolean 
stateUpdaterEnabled, final boolean processingThreadsEnabled) {
         final TaskManager taskManager = mock(TaskManager.class);
         final InternalTopologyBuilder internalTopologyBuilder = 
mock(InternalTopologyBuilder.class);
@@ -1512,7 +1513,7 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void shouldShutdownTaskManagerOnCloseWithoutStart(final boolean 
stateUpdaterEnabled, final boolean processingThreadsEnabled) {
         final ConsumerGroupMetadata consumerGroupMetadata = 
mock(ConsumerGroupMetadata.class);
         when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata);
@@ -1524,13 +1525,13 @@ public class StreamThreadTest {
         topologyMetadata.buildAndRewriteTopology();
         thread = buildStreamThread(consumer, taskManager, config, 
topologyMetadata)
             .updateThreadMetadata(adminClientId(CLIENT_ID));
-        thread.shutdown(true);
+        thread.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
 
         verify(taskManager).shutdown(true);
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void shouldOnlyShutdownOnce(final boolean stateUpdaterEnabled, 
final boolean processingThreadsEnabled) {
         final ConsumerGroupMetadata consumerGroupMetadata = 
mock(ConsumerGroupMetadata.class);
         when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata);
@@ -1542,7 +1543,7 @@ public class StreamThreadTest {
         topologyMetadata.buildAndRewriteTopology();
         thread = buildStreamThread(consumer, taskManager, config, 
topologyMetadata)
             .updateThreadMetadata(adminClientId(CLIENT_ID));
-        thread.shutdown(true);
+        thread.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
         // Execute the run method. Verification of the mock will check that 
shutdown was only done once
         thread.run();
 
@@ -1550,7 +1551,7 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void 
shouldNotThrowWhenStandbyTasksAssignedAndNoStateStoresForTopology(final boolean 
stateUpdaterEnabled, final boolean processingThreadsEnabled) {
         internalTopologyBuilder.addSource(null, "name", null, null, null, 
"topic");
         internalTopologyBuilder.addSink("out", "output", null, null, null, 
"name");
@@ -1572,7 +1573,7 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void 
shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerWasFencedWhileProcessing(final
 boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws 
Exception {
         internalTopologyBuilder.addSource(null, "source", null, null, null, 
topic1);
         internalTopologyBuilder.addSink("sink", "dummyTopic", null, null, 
null, "source");
@@ -1688,18 +1689,18 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void 
shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerGotFencedInCommitTransactionWhenSuspendingTasks(final
 boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws 
Exception {
         testThrowingDurringCommitTransactionException(new 
ProducerFencedException("Producer is fenced"), stateUpdaterEnabled, 
processingThreadsEnabled);
     }
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void 
shouldNotCloseTaskAndRemoveFromTaskManagerIfInvalidPidMappingOccurredInCommitTransactionWhenSuspendingTasks(final
 boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws 
Exception {
         testThrowingDurringCommitTransactionException(new 
InvalidPidMappingException("PidMapping is invalid"), stateUpdaterEnabled, 
processingThreadsEnabled);
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void shouldReinitializeRevivedTasksInAnyState(final boolean 
stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception {
         final StreamsConfig config = new StreamsConfig(configProps(false, 
stateUpdaterEnabled, processingThreadsEnabled));
         thread = createStreamThread(CLIENT_ID, config, new MockTime(1));
@@ -1873,19 +1874,19 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void 
shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerGotFencedInCommitTransactionWhenCommitting(final
 boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
         
testNotCloseTaskAndRemoveFromTaskManagerInCommitTransactionWhenCommitting(new 
ProducerFencedException("Producer is fenced"), stateUpdaterEnabled, 
processingThreadsEnabled);
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void 
shouldNotCloseTaskAndRemoveFromTaskManagerIfPidMappingIsInvalidInCommitTransactionWhenCommitting(final
 boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
         
testNotCloseTaskAndRemoveFromTaskManagerInCommitTransactionWhenCommitting(new 
InvalidPidMappingException("PID Mapping is invalid"), stateUpdaterEnabled, 
processingThreadsEnabled);
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void shouldNotCloseTaskProducerWhenSuspending(final boolean 
stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception {
         final StreamsConfig config = new StreamsConfig(configProps(true, 
stateUpdaterEnabled, processingThreadsEnabled));
         thread = createStreamThread(CLIENT_ID, config);
@@ -1933,7 +1934,7 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void shouldReturnActiveTaskMetadataWhileRunningState(final boolean 
stateUpdaterEnabled, final boolean processingThreadsEnabled) {
         final StreamsConfig config = new StreamsConfig(configProps(false, 
stateUpdaterEnabled, processingThreadsEnabled));
         internalTopologyBuilder.addSource(null, "source", null, null, null, 
topic1);
@@ -2011,7 +2012,7 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void shouldReturnStandbyTaskMetadataWhileRunningState(final boolean 
stateUpdaterEnabled, final boolean processingThreadsEnabled) {
         final StreamsConfig config = new StreamsConfig(configProps(false, 
stateUpdaterEnabled, processingThreadsEnabled));
         internalStreamsBuilder.stream(Collections.singleton(topic1), consumed)
@@ -2059,7 +2060,7 @@ public class StreamThreadTest {
 
     @SuppressWarnings("unchecked")
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void shouldUpdateStandbyTask(final boolean stateUpdaterEnabled, 
final boolean processingThreadsEnabled) throws Exception {
         // Updating standby tasks on the stream thread only happens when the 
state updater is disabled
         assumeFalse(stateUpdaterEnabled);
@@ -2183,7 +2184,7 @@ public class StreamThreadTest {
 
     @SuppressWarnings("unchecked")
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void shouldNotUpdateStandbyTaskWhenPaused(final boolean 
stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception {
         // Updating standby tasks on the stream thread only happens when the 
state updater is disabled
         assumeFalse(stateUpdaterEnabled);
@@ -2243,7 +2244,7 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void shouldCreateStandbyTask(final boolean stateUpdaterEnabled, 
final boolean processingThreadsEnabled) {
         final StreamsConfig config = new StreamsConfig(configProps(false, 
stateUpdaterEnabled, processingThreadsEnabled));
         setupInternalTopologyWithoutState(config);
@@ -2253,7 +2254,7 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void shouldNotCreateStandbyTaskWithoutStateStores(final boolean 
stateUpdaterEnabled, final boolean processingThreadsEnabled) {
         final StreamsConfig config = new StreamsConfig(configProps(false, 
stateUpdaterEnabled, processingThreadsEnabled));
         setupInternalTopologyWithoutState(config);
@@ -2262,7 +2263,7 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void 
shouldNotCreateStandbyTaskIfStateStoresHaveLoggingDisabled(final boolean 
stateUpdaterEnabled, final boolean processingThreadsEnabled) {
         final StreamsConfig config = new StreamsConfig(configProps(false, 
stateUpdaterEnabled, processingThreadsEnabled));
         setupInternalTopologyWithoutState(config);
@@ -2275,7 +2276,7 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     @SuppressWarnings("deprecation")
     public void shouldPunctuateActiveTask(final boolean stateUpdaterEnabled, 
final boolean processingThreadsEnabled) {
         assumeFalse(processingThreadsEnabled);
@@ -2426,7 +2427,7 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void shouldAlwaysUpdateTasksMetadataAfterChangingState(final 
boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
         final StreamsConfig config = new StreamsConfig(configProps(false, 
stateUpdaterEnabled, processingThreadsEnabled));
         thread = createStreamThread(CLIENT_ID, config);
@@ -2442,7 +2443,7 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void 
shouldRecoverFromInvalidOffsetExceptionOnRestoreAndFinishRestore(final boolean 
stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception {
         internalStreamsBuilder.stream(Collections.singleton("topic"), consumed)
             .groupByKey()
@@ -2632,7 +2633,7 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void shouldThrowTaskMigratedExceptionHandlingTaskLost(final boolean 
stateUpdaterEnabled, final boolean processingThreadsEnabled) {
         final StreamsConfig config = new StreamsConfig(configProps(false, 
stateUpdaterEnabled, processingThreadsEnabled));
         final Set<TopicPartition> assignedPartitions = 
Collections.singleton(t1p1);
@@ -2660,7 +2661,7 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void shouldThrowTaskMigratedExceptionHandlingRevocation(final 
boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
         final StreamsConfig config = new StreamsConfig(configProps(false, 
stateUpdaterEnabled, processingThreadsEnabled));
         final Set<TopicPartition> assignedPartitions = 
Collections.singleton(t1p1);
@@ -2688,7 +2689,7 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     @SuppressWarnings("unchecked")
     public void shouldCatchHandleCorruptionOnTaskCorruptedExceptionPath(final 
boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
         final StreamsConfig config = new StreamsConfig(configProps(false, 
stateUpdaterEnabled, processingThreadsEnabled));
@@ -2749,7 +2750,7 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     @SuppressWarnings("unchecked")
     public void 
shouldCatchTimeoutExceptionFromHandleCorruptionAndInvokeExceptionHandler(final 
boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
         final StreamsConfig config = new StreamsConfig(configProps(false, 
stateUpdaterEnabled, processingThreadsEnabled));
@@ -2815,7 +2816,7 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     @SuppressWarnings("unchecked")
     public void 
shouldCatchTaskMigratedExceptionOnOnTaskCorruptedExceptionPath(final boolean 
stateUpdaterEnabled, final boolean processingThreadsEnabled) {
         final StreamsConfig config = new StreamsConfig(configProps(false, 
stateUpdaterEnabled, processingThreadsEnabled));
@@ -2881,7 +2882,7 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     @SuppressWarnings("unchecked")
     public void 
shouldEnforceRebalanceWhenTaskCorruptedExceptionIsThrownForAnActiveTask(final 
boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
         final StreamsConfig config = new StreamsConfig(configProps(true, 
stateUpdaterEnabled, processingThreadsEnabled));
@@ -2946,7 +2947,7 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     @SuppressWarnings("unchecked")
     public void 
shouldNotEnforceRebalanceWhenTaskCorruptedExceptionIsThrownForAnInactiveTask(final
 boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
         final StreamsConfig config = new StreamsConfig(configProps(true, 
stateUpdaterEnabled, processingThreadsEnabled));
@@ -3009,7 +3010,7 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void shouldNotCommitNonRunningNonRestoringTasks(final boolean 
stateUpdaterEnabled, final boolean processingThreadsEnabled) {
         final StreamsConfig config = new StreamsConfig(configProps(false, 
stateUpdaterEnabled, processingThreadsEnabled));
         final TaskManager taskManager = mock(TaskManager.class);
@@ -3048,7 +3049,7 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void shouldLogAndRecordSkippedRecordsForInvalidTimestamps(
         final boolean stateUpdaterEnabled,
         final boolean processingThreadsEnabled
@@ -3155,7 +3156,7 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void shouldTransmitTaskManagerMetrics(final boolean 
stateUpdaterEnabled, final boolean processingThreadsEnabled) {
         final StreamsConfig config = new StreamsConfig(configProps(false, 
stateUpdaterEnabled, processingThreadsEnabled));
         final ConsumerGroupMetadata consumerGroupMetadata = 
mock(ConsumerGroupMetadata.class);
@@ -3182,7 +3183,7 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void shouldConstructAdminMetrics(final boolean stateUpdaterEnabled, 
final boolean processingThreadsEnabled) {
         final StreamsConfig config = new StreamsConfig(configProps(false, 
stateUpdaterEnabled, processingThreadsEnabled));
         final Node broker1 = new Node(0, "dummyHost-1", 1234);
@@ -3239,13 +3240,13 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void shouldNotRecordFailedStreamThread(final boolean 
stateUpdaterEnabled, final boolean processingThreadsEnabled) {
         runAndVerifyFailedStreamThreadRecording(false, stateUpdaterEnabled, 
processingThreadsEnabled);
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void shouldRecordFailedStreamThread(final boolean 
stateUpdaterEnabled, final boolean processingThreadsEnabled) {
         runAndVerifyFailedStreamThreadRecording(true, stateUpdaterEnabled, 
processingThreadsEnabled);
     }
@@ -3308,7 +3309,7 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void shouldCheckStateUpdater(final boolean stateUpdaterEnabled, 
final boolean processingThreadsEnabled) {
         assumeTrue(stateUpdaterEnabled);
         final Properties streamsConfigProps = configProps(false, 
stateUpdaterEnabled, processingThreadsEnabled);
@@ -3326,7 +3327,7 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void shouldCheckStateUpdaterInBetweenProcessCalls(final boolean 
stateUpdaterEnabled, final boolean processingThreadsEnabled) {
         assumeTrue(stateUpdaterEnabled);
         assumeFalse(processingThreadsEnabled);
@@ -3344,7 +3345,7 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void shouldUpdateLagsAfterPolling(final boolean 
stateUpdaterEnabled, final boolean processingThreadsEnabled) {
         final Properties streamsConfigProps = configProps(false, 
stateUpdaterEnabled, processingThreadsEnabled);
         thread = setUpThread(streamsConfigProps);
@@ -3362,7 +3363,7 @@ public class StreamThreadTest {
 
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void 
shouldResumePollingForPartitionsWithAvailableSpaceBeforePolling(final boolean 
stateUpdaterEnabled, final boolean processingThreadsEnabled) {
         final Properties streamsConfigProps = configProps(false, 
stateUpdaterEnabled, processingThreadsEnabled);
         thread = setUpThread(streamsConfigProps);
@@ -3377,7 +3378,7 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void 
shouldRespectPollTimeInPartitionsAssignedStateWithStateUpdater(final boolean 
stateUpdaterEnabled, final boolean processingThreadsEnabled) {
         assumeTrue(stateUpdaterEnabled);
         final Properties streamsConfigProps = configProps(false, 
stateUpdaterEnabled, processingThreadsEnabled);
@@ -3393,7 +3394,7 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void 
shouldNotBlockWhenPollingInPartitionsAssignedStateWithoutStateUpdater(final 
boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
         assumeFalse(stateUpdaterEnabled);
         final Properties streamsConfigProps = configProps(false, 
stateUpdaterEnabled, processingThreadsEnabled);
@@ -3407,13 +3408,13 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void shouldGetMainAndRestoreConsumerInstanceId(final boolean 
stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception {
         getClientInstanceId(false, stateUpdaterEnabled, 
processingThreadsEnabled);
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void 
shouldGetMainAndRestoreConsumerInstanceIdWithInternalTimeout(final boolean 
stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception {
         getClientInstanceId(true, stateUpdaterEnabled, 
processingThreadsEnabled);
     }
@@ -3460,7 +3461,7 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void shouldReturnErrorIfMainConsumerInstanceIdNotInitialized(final 
boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
         thread = createStreamThread("clientId", stateUpdaterEnabled, 
processingThreadsEnabled);
         thread.setState(State.STARTING);
@@ -3477,7 +3478,7 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void 
shouldReturnErrorIfRestoreConsumerInstanceIdNotInitialized(final boolean 
stateUpdaterEnabled, final boolean processingThreadsEnabled) {
         thread = createStreamThread("clientId", stateUpdaterEnabled, 
processingThreadsEnabled);
         thread.setState(State.STARTING);
@@ -3494,7 +3495,7 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void shouldReturnErrorIfProducerInstanceIdNotInitialized(final 
boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
         thread = createStreamThread("clientId", stateUpdaterEnabled, 
processingThreadsEnabled);
         thread.setState(State.STARTING);
@@ -3511,7 +3512,7 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void shouldReturnNullIfMainConsumerTelemetryDisabled(final boolean 
stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception {
         clientSupplier.consumer.disableTelemetry();
         thread = createStreamThread("clientId", stateUpdaterEnabled, 
processingThreadsEnabled);
@@ -3528,7 +3529,7 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void shouldReturnNullIfRestoreConsumerTelemetryDisabled(final 
boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws 
Exception {
         clientSupplier.restoreConsumer.disableTelemetry();
 
@@ -3546,7 +3547,7 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void shouldReturnNullIfProducerTelemetryDisabled(final boolean 
stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception {
         final MockProducer<byte[], byte[]> producer = new MockProducer<>();
         producer.disableTelemetry();
@@ -3566,7 +3567,7 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void shouldTimeOutOnMainConsumerInstanceId(final boolean 
stateUpdaterEnabled, final boolean processingThreadsEnabled) {
         clientSupplier.consumer.setClientInstanceId(Uuid.randomUuid());
         clientSupplier.consumer.injectTimeoutException(-1);
@@ -3591,7 +3592,7 @@ public class StreamThreadTest {
 
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void shouldTimeOutOnRestoreConsumerInstanceId(final boolean 
stateUpdaterEnabled, final boolean processingThreadsEnabled) {
         clientSupplier.restoreConsumer.setClientInstanceId(Uuid.randomUuid());
         clientSupplier.restoreConsumer.injectTimeoutException(-1);
@@ -3616,7 +3617,7 @@ public class StreamThreadTest {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
     public void shouldTimeOutOnProducerInstanceId(final boolean 
stateUpdaterEnabled, final boolean processingThreadsEnabled) {
         final MockProducer<byte[], byte[]> producer = new MockProducer<>();
         producer.setClientInstanceId(Uuid.randomUuid());
@@ -3964,13 +3965,13 @@ public class StreamThreadTest {
                         
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_SOURCE_TOPICS.code())
                         .setStatusDetail("Missing source topics")
         ));
-        
+
         // First call should not throw exception (within timeout)
         thread.runOnceWithoutProcessingThreads();
-        
+
         // Advance time beyond max.poll.interval.ms (default is 300000ms) to 
trigger timeout
         mockTime.sleep(300001);
-        
+
         final MissingSourceTopicException exception = 
assertThrows(MissingSourceTopicException.class, () -> 
thread.runOnceWithoutProcessingThreads());
         assertTrue(exception.getMessage().contains("Missing source topics"));
         assertTrue(exception.getMessage().contains("Timeout exceeded"));
@@ -4032,7 +4033,7 @@ public class StreamThreadTest {
         ));
 
         // Should immediately throw TopologyException (no timeout like 
MISSING_SOURCE_TOPICS)
-        final TopologyException exception = 
assertThrows(TopologyException.class, 
+        final TopologyException exception = 
assertThrows(TopologyException.class,
             () -> thread.runOnceWithoutProcessingThreads());
         assertTrue(exception.getMessage().contains("Topics are incorrectly 
partitioned"));
     }
@@ -4151,13 +4152,13 @@ public class StreamThreadTest {
                         
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_SOURCE_TOPICS.code())
                         .setStatusDetail("Missing source topics")
         ));
-        
+
         // First call should not throw exception (within timeout)
         thread.runOnceWithProcessingThreads();
-        
+
         // Advance time beyond max.poll.interval.ms (default is 300000ms) to 
trigger timeout
         mockTime.sleep(300001);
-        
+
         final MissingSourceTopicException exception = 
assertThrows(MissingSourceTopicException.class, () -> 
thread.runOnceWithProcessingThreads());
         assertTrue(exception.getMessage().contains("Missing source topics"));
         assertTrue(exception.getMessage().contains("Timeout exceeded"));
@@ -4219,35 +4220,35 @@ public class StreamThreadTest {
                         
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_SOURCE_TOPICS.code())
                         .setStatusDetail("Missing source topics")
         ));
-        
+
         // First call should not throw exception (within timeout)
         thread.runOnceWithoutProcessingThreads();
-        
+
         // Advance time but not beyond timeout
         mockTime.sleep(150000); // Half of max.poll.interval.ms
-        
+
         // Should still not throw exception
         thread.runOnceWithoutProcessingThreads();
-        
+
         // Clear the missing source topics (simulate recovery)
         streamsRebalanceData.setStatuses(List.of());
-        
+
         // Should complete without exception (recovery successful)
         assertDoesNotThrow(() -> thread.runOnceWithoutProcessingThreads());
-        
+
         // Set missing topics again - should reset the timeout
         streamsRebalanceData.setStatuses(List.of(
                 new StreamsGroupHeartbeatResponseData.Status()
                         
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_SOURCE_TOPICS.code())
                         .setStatusDetail("Different missing topics")
         ));
-        
+
         // Advance time by 250 seconds to test if timer was reset
         // Total time from beginning: 150000 + 250000 = 400000ms (400s)
         // If timer was NOT reset: elapsed time = 400s > 300s → should throw
         // If timer WAS reset: elapsed time = 250s < 300s → should NOT throw
         mockTime.sleep(250000); // Advance by 250 seconds
-        
+
         // Should not throw because timer was reset - only 250s elapsed from 
reset point
         assertDoesNotThrow(() -> thread.runOnceWithoutProcessingThreads());
     }
@@ -4427,7 +4428,7 @@ public class StreamThreadTest {
             null
         );
     }
-    
+
     private void runOnce(final boolean processingThreadsEnabled) {
         if (processingThreadsEnabled) {
             thread.runOnceWithProcessingThreads();
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupOffsetTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupOffsetTest.java
index 94e9b30acfe..b6e32e8ebda 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupOffsetTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupOffsetTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.streams.CloseOptions;
 import org.apache.kafka.streams.GroupProtocol;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValueTimestamp;
@@ -341,9 +342,8 @@ public class DeleteStreamsGroupOffsetTest {
 
     private void stopKSApp(String appId, String topic, KafkaStreams streams, 
StreamsGroupCommand.StreamsGroupService service) throws InterruptedException {
         if (streams != null) {
-            KafkaStreams.CloseOptions closeOptions = new 
KafkaStreams.CloseOptions();
-            closeOptions.timeout(Duration.ofSeconds(30));
-            closeOptions.leaveGroup(true);
+            CloseOptions closeOptions = 
CloseOptions.timeout(Duration.ofSeconds(30))
+                
.withGroupMembershipOperation(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
             streams.close(closeOptions);
             streams.cleanUp();
 
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupTest.java
index 25994e60a57..80d1806ed6c 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.CloseOptions;
 import org.apache.kafka.streams.GroupProtocol;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValueTimestamp;
@@ -512,9 +513,8 @@ public class DeleteStreamsGroupTest {
 
     private void stopKSApp(String appId, KafkaStreams streams, 
StreamsGroupCommand.StreamsGroupService service) throws InterruptedException {
         if (streams != null) {
-            KafkaStreams.CloseOptions closeOptions = new 
KafkaStreams.CloseOptions();
-            closeOptions.timeout(Duration.ofSeconds(30));
-            closeOptions.leaveGroup(true);
+            CloseOptions closeOptions = 
CloseOptions.timeout(Duration.ofSeconds(30))
+                    
.withGroupMembershipOperation(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
             streams.close(closeOptions);
             streams.cleanUp();
 

Reply via email to