This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 2bd6b99  KAFKA-13155; Fix concurrent modification in consumer shutdown 
(#11164)
2bd6b99 is described below

commit 2bd6b993075f09610120d1bf205e026824088218
Author: Jason Gustafson <[email protected]>
AuthorDate: Tue Aug 10 09:39:59 2021 -0700

    KAFKA-13155; Fix concurrent modification in consumer shutdown (#11164)
    
    The `TransactionalMessageCopier` tool, which is used in system tests 
attempts to close the consumer as part of a shutdown hook. Although the access 
is synchronized, there is no guarantee that the consumer has finished polling 
when shutdown is invoked. The patch fixes the problem by call `wakeup()` from 
the shutdown hook and pushing the call to `close()` to the main thread.
    
    Reviewers: David Jacot <[email protected]>
---
 .../kafka/tools/TransactionalMessageCopier.java    | 24 +++++++++++-----------
 1 file changed, 12 insertions(+), 12 deletions(-)

diff --git 
a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java 
b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
index 230402e..3e0e3a3 100644
--- a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
+++ b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
@@ -35,7 +35,9 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.OutOfOrderSequenceException;
 import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
 
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
@@ -325,11 +327,7 @@ public class TransactionalMessageCopier {
 
         Exit.addShutdownHook("transactional-message-copier-shutdown-hook", () 
-> {
             isShuttingDown.set(true);
-            // Flush any remaining messages
-            producer.close();
-            synchronized (consumer) {
-                consumer.close();
-            }
+            consumer.wakeup();
             System.out.println(shutDownString(totalMessageProcessed.get(),
                 numMessagesProcessedSinceLastRebalance.get(), 
remainingMessages.get(), transactionalId));
         });
@@ -337,11 +335,9 @@ public class TransactionalMessageCopier {
         final boolean useGroupMetadata = 
parsedArgs.getBoolean("useGroupMetadata");
         try {
             Random random = new Random();
-            while (remainingMessages.get() > 0) {
+            while (!isShuttingDown.get() && remainingMessages.get() > 0) {
                 System.out.println(statusAsJson(totalMessageProcessed.get(),
                     numMessagesProcessedSinceLastRebalance.get(), 
remainingMessages.get(), transactionalId, "ProcessLoop"));
-                if (isShuttingDown.get())
-                    break;
 
                 ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofMillis(200));
                 if (records.count() > 0) {
@@ -374,11 +370,15 @@ public class TransactionalMessageCopier {
                     }
                 }
             }
-        } finally {
-            producer.close();
-            synchronized (consumer) {
-                consumer.close();
+        } catch (WakeupException e) {
+            if (!isShuttingDown.get()) {
+                // Let the exception propagate if the exception was not raised
+                // as part of shutdown.
+                throw e;
             }
+        } finally {
+            Utils.closeQuietly(producer, "producer");
+            Utils.closeQuietly(consumer, "consumer");
         }
         Exit.exit(0);
     }

Reply via email to