This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 2bd6b99 KAFKA-13155; Fix concurrent modification in consumer shutdown
(#11164)
2bd6b99 is described below
commit 2bd6b993075f09610120d1bf205e026824088218
Author: Jason Gustafson <[email protected]>
AuthorDate: Tue Aug 10 09:39:59 2021 -0700
KAFKA-13155; Fix concurrent modification in consumer shutdown (#11164)
The `TransactionalMessageCopier` tool, which is used in system tests
attempts to close the consumer as part of a shutdown hook. Although the access
is synchronized, there is no guarantee that the consumer has finished polling
when shutdown is invoked. The patch fixes the problem by call `wakeup()` from
the shutdown hook and pushing the call to `close()` to the main thread.
Reviewers: David Jacot <[email protected]>
---
.../kafka/tools/TransactionalMessageCopier.java | 24 +++++++++++-----------
1 file changed, 12 insertions(+), 12 deletions(-)
diff --git
a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
index 230402e..3e0e3a3 100644
--- a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
+++ b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
@@ -35,7 +35,9 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
@@ -325,11 +327,7 @@ public class TransactionalMessageCopier {
Exit.addShutdownHook("transactional-message-copier-shutdown-hook", ()
-> {
isShuttingDown.set(true);
- // Flush any remaining messages
- producer.close();
- synchronized (consumer) {
- consumer.close();
- }
+ consumer.wakeup();
System.out.println(shutDownString(totalMessageProcessed.get(),
numMessagesProcessedSinceLastRebalance.get(),
remainingMessages.get(), transactionalId));
});
@@ -337,11 +335,9 @@ public class TransactionalMessageCopier {
final boolean useGroupMetadata =
parsedArgs.getBoolean("useGroupMetadata");
try {
Random random = new Random();
- while (remainingMessages.get() > 0) {
+ while (!isShuttingDown.get() && remainingMessages.get() > 0) {
System.out.println(statusAsJson(totalMessageProcessed.get(),
numMessagesProcessedSinceLastRebalance.get(),
remainingMessages.get(), transactionalId, "ProcessLoop"));
- if (isShuttingDown.get())
- break;
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(200));
if (records.count() > 0) {
@@ -374,11 +370,15 @@ public class TransactionalMessageCopier {
}
}
}
- } finally {
- producer.close();
- synchronized (consumer) {
- consumer.close();
+ } catch (WakeupException e) {
+ if (!isShuttingDown.get()) {
+ // Let the exception propagate if the exception was not raised
+ // as part of shutdown.
+ throw e;
}
+ } finally {
+ Utils.closeQuietly(producer, "producer");
+ Utils.closeQuietly(consumer, "consumer");
}
Exit.exit(0);
}