This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f29c43b  KAFKA-12979; Implement command to find hanging transactions 
(#10974)
f29c43b is described below

commit f29c43bdbb85fd7c97ad2f567dcd67f2f2ece8ef
Author: Jason Gustafson <[email protected]>
AuthorDate: Tue Jul 6 10:39:59 2021 -0700

    KAFKA-12979; Implement command to find hanging transactions (#10974)
    
    This patch implements the `find-hanging` command described in KIP-664: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-664%3A+Provide+tooling+to+detect+and+abort+hanging+transactions#KIP664:Providetoolingtodetectandaborthangingtransactions-FindingHangingTransactions.
    
    Reviewers: Luke Chen <[email protected]>, David Jacot <[email protected]>
---
 .../clients/admin/ListTransactionsOptions.java     |  15 +
 .../apache/kafka/tools/TransactionsCommand.java    | 445 ++++++++++++++-
 .../kafka/tools/TransactionsCommandTest.java       | 604 ++++++++++++++++++++-
 3 files changed, 1050 insertions(+), 14 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java
index 3b2d902..c23d444 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.annotation.InterfaceStability;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.Objects;
 import java.util.Set;
 
 /**
@@ -88,4 +89,18 @@ public class ListTransactionsOptions extends 
AbstractOptions<ListTransactionsOpt
             ", timeoutMs=" + timeoutMs +
             ')';
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        ListTransactionsOptions that = (ListTransactionsOptions) o;
+        return Objects.equals(filteredStates, that.filteredStates) &&
+            Objects.equals(filteredProducerIds, that.filteredProducerIds);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(filteredStates, filteredProducerIds);
+    }
 }
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java 
b/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
index 2419460..b8f1c9d 100644
--- a/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
@@ -28,10 +28,15 @@ import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.DescribeProducersOptions;
 import org.apache.kafka.clients.admin.DescribeProducersResult;
+import org.apache.kafka.clients.admin.DescribeTransactionsResult;
+import org.apache.kafka.clients.admin.ListTransactionsOptions;
 import org.apache.kafka.clients.admin.ProducerState;
+import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.clients.admin.TransactionDescription;
 import org.apache.kafka.clients.admin.TransactionListing;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.errors.TransactionalIdNotFoundException;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
@@ -43,12 +48,17 @@ import java.io.PrintStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -434,7 +444,7 @@ public abstract class TransactionsCommand {
             final Map<Integer, Collection<TransactionListing>> result;
 
             try {
-                result = admin.listTransactions()
+                result = admin.listTransactions(new ListTransactionsOptions())
                     .allByBrokerId()
                     .get();
             } catch (ExecutionException e) {
@@ -461,6 +471,436 @@ public abstract class TransactionsCommand {
         }
     }
 
+    static class FindHangingTransactionsCommand extends TransactionsCommand {
+        private static final int MAX_BATCH_SIZE = 500;
+
+        static final String[] HEADERS = new String[] {
+            "Topic",
+            "Partition",
+            "ProducerId",
+            "ProducerEpoch",
+            "CoordinatorEpoch",
+            "StartOffset",
+            "LastTimestamp",
+            "Duration(min)"
+        };
+
+        FindHangingTransactionsCommand(Time time) {
+            super(time);
+        }
+
+        @Override
+        String name() {
+            return "find-hanging";
+        }
+
+        @Override
+        void addSubparser(Subparsers subparsers) {
+            Subparser subparser = subparsers.addParser(name())
+                .help("find hanging transactions");
+
+            subparser.addArgument("--broker-id")
+                .help("broker id to search for hanging transactions")
+                .action(store())
+                .type(Integer.class)
+                .required(false);
+
+            subparser.addArgument("--max-transaction-timeout")
+                .help("maximum transaction timeout in minutes to limit the 
scope of the search (15 minutes by default)")
+                .action(store())
+                .type(Integer.class)
+                .setDefault(15)
+                .required(false);
+
+            subparser.addArgument("--topic")
+                .help("topic name to limit search to (required if --partition 
is specified)")
+                .action(store())
+                .type(String.class)
+                .required(false);
+
+            subparser.addArgument("--partition")
+                .help("partition number")
+                .action(store())
+                .type(Integer.class)
+                .required(false);
+        }
+
+        @Override
+        void execute(Admin admin, Namespace ns, PrintStream out) throws 
Exception {
+            Optional<Integer> brokerId = 
Optional.ofNullable(ns.getInt("broker_id"));
+            Optional<String> topic = 
Optional.ofNullable(ns.getString("topic"));
+
+            if (!topic.isPresent() && !brokerId.isPresent()) {
+                printErrorAndExit("The `find-hanging` command requires either 
--topic " +
+                    "or --broker-id to limit the scope of the search");
+                return;
+            }
+
+            Optional<Integer> partition = 
Optional.ofNullable(ns.getInt("partition"));
+            if (partition.isPresent() && !topic.isPresent()) {
+                printErrorAndExit("The --partition argument requires --topic 
to be provided");
+                return;
+            }
+
+            long maxTransactionTimeoutMs = TimeUnit.MINUTES.toMillis(
+                ns.getInt("max_transaction_timeout"));
+
+            List<TopicPartition> topicPartitions = 
collectTopicPartitionsToSearch(
+                admin,
+                topic,
+                partition,
+                brokerId
+            );
+
+            List<OpenTransaction> candidates = 
collectCandidateOpenTransactions(
+                admin,
+                brokerId,
+                maxTransactionTimeoutMs,
+                topicPartitions
+            );
+
+            if (candidates.isEmpty()) {
+                printHangingTransactions(Collections.emptyList(), out);
+            } else {
+                Map<Long, List<OpenTransaction>> openTransactionsByProducerId 
= groupByProducerId(candidates);
+
+                Map<Long, String> transactionalIds = lookupTransactionalIds(
+                    admin,
+                    openTransactionsByProducerId.keySet()
+                );
+
+                Map<String, TransactionDescription> descriptions = 
describeTransactions(
+                    admin,
+                    transactionalIds.values()
+                );
+
+                List<OpenTransaction> hangingTransactions = 
filterHangingTransactions(
+                    openTransactionsByProducerId,
+                    transactionalIds,
+                    descriptions
+                );
+
+                printHangingTransactions(hangingTransactions, out);
+            }
+        }
+
+        private List<TopicPartition> collectTopicPartitionsToSearch(
+            Admin admin,
+            Optional<String> topic,
+            Optional<Integer> partition,
+            Optional<Integer> brokerId
+        ) throws Exception {
+            final List<String> topics;
+
+            if (topic.isPresent()) {
+                if (partition.isPresent()) {
+                    return Collections.singletonList(new 
TopicPartition(topic.get(), partition.get()));
+                } else {
+                    topics = Collections.singletonList(topic.get());
+                }
+            } else {
+                topics = listTopics(admin);
+            }
+
+            return findTopicPartitions(
+                admin,
+                brokerId,
+                topics
+            );
+        }
+
+        private List<OpenTransaction> filterHangingTransactions(
+            Map<Long, List<OpenTransaction>> openTransactionsByProducerId,
+            Map<Long, String> transactionalIds,
+            Map<String, TransactionDescription> descriptions
+        ) {
+            List<OpenTransaction> hangingTransactions = new ArrayList<>();
+
+            openTransactionsByProducerId.forEach((producerId, 
openTransactions) -> {
+                String transactionalId = transactionalIds.get(producerId);
+                if (transactionalId == null) {
+                    // If we could not find the transactionalId corresponding 
to the
+                    // producerId of an open transaction, then the transaction 
is hanging.
+                    hangingTransactions.addAll(openTransactions);
+                } else {
+                    // Otherwise, we need to check the current transaction 
state.
+                    TransactionDescription description = 
descriptions.get(transactionalId);
+                    if (description == null) {
+                        hangingTransactions.addAll(openTransactions);
+                    } else {
+                        for (OpenTransaction openTransaction : 
openTransactions) {
+                            // The `DescribeTransactions` API returns all 
partitions being
+                            // written to in an ongoing transaction and any 
partition which
+                            // does not yet have markers written when in the 
`PendingAbort` or
+                            // `PendingCommit` states. If the topic partition 
that we found is
+                            // among these, then we can still expect the 
coordinator to write
+                            // the marker. Otherwise, it is a hanging 
transaction.
+                            if 
(!description.topicPartitions().contains(openTransaction.topicPartition)) {
+                                hangingTransactions.add(openTransaction);
+                            }
+                        }
+                    }
+                }
+            });
+
+            return hangingTransactions;
+        }
+
+        private void printHangingTransactions(
+            List<OpenTransaction> hangingTransactions,
+            PrintStream out
+        ) {
+            long currentTimeMs = time.milliseconds();
+            List<String[]> rows = new ArrayList<>(hangingTransactions.size());
+
+            for (OpenTransaction transaction : hangingTransactions) {
+                long transactionDurationMinutes = 
TimeUnit.MILLISECONDS.toMinutes(
+                    currentTimeMs - transaction.producerState.lastTimestamp());
+
+                rows.add(new String[] {
+                    transaction.topicPartition.topic(),
+                    String.valueOf(transaction.topicPartition.partition()),
+                    String.valueOf(transaction.producerState.producerId()),
+                    String.valueOf(transaction.producerState.producerEpoch()),
+                    
String.valueOf(transaction.producerState.coordinatorEpoch().orElse(-1)),
+                    
String.valueOf(transaction.producerState.currentTransactionStartOffset().orElse(-1)),
+                    String.valueOf(transaction.producerState.lastTimestamp()),
+                    String.valueOf(transactionDurationMinutes)
+                });
+            }
+
+            prettyPrintTable(HEADERS, rows, out);
+        }
+
+        private Map<String, TransactionDescription> describeTransactions(
+            Admin admin,
+            Collection<String> transactionalIds
+        ) throws Exception {
+            try {
+                DescribeTransactionsResult result = 
admin.describeTransactions(new HashSet<>(transactionalIds));
+                Map<String, TransactionDescription> descriptions = new 
HashMap<>();
+
+                for (String transactionalId : transactionalIds) {
+                    try {
+                        TransactionDescription description = 
result.description(transactionalId).get();
+                        descriptions.put(transactionalId, description);
+                    } catch (ExecutionException e) {
+                        if (e.getCause() instanceof 
TransactionalIdNotFoundException) {
+                            descriptions.put(transactionalId, null);
+                        } else {
+                            throw e;
+                        }
+                    }
+                }
+
+                return descriptions;
+            } catch (ExecutionException e) {
+                printErrorAndExit("Failed to describe " + 
transactionalIds.size()
+                    + " transactions", e.getCause());
+                return Collections.emptyMap();
+            }
+        }
+
+        private Map<Long, List<OpenTransaction>> groupByProducerId(
+            List<OpenTransaction> openTransactions
+        ) {
+            Map<Long, List<OpenTransaction>> res = new HashMap<>();
+            for (OpenTransaction transaction : openTransactions) {
+                List<OpenTransaction> states = res.computeIfAbsent(
+                    transaction.producerState.producerId(),
+                    __ -> new ArrayList<>()
+                );
+                states.add(transaction);
+            }
+            return res;
+        }
+
+        private List<String> listTopics(
+            Admin admin
+        ) throws Exception {
+            try {
+                return new ArrayList<>(admin.listTopics().names().get());
+            } catch (ExecutionException e) {
+                printErrorAndExit("Failed to list topics", e.getCause());
+                return Collections.emptyList();
+            }
+        }
+
+        private List<TopicPartition> findTopicPartitions(
+            Admin admin,
+            Optional<Integer> brokerId,
+            List<String> topics
+        ) throws Exception {
+            List<TopicPartition> topicPartitions = new ArrayList<>();
+            consumeInBatches(topics, MAX_BATCH_SIZE, batch -> {
+                findTopicPartitions(
+                    admin,
+                    brokerId,
+                    batch,
+                    topicPartitions
+                );
+            });
+            return topicPartitions;
+        }
+
+        private void findTopicPartitions(
+            Admin admin,
+            Optional<Integer> brokerId,
+            List<String> topics,
+            List<TopicPartition> topicPartitions
+        ) throws Exception {
+            try {
+                Map<String, TopicDescription> topicDescriptions = 
admin.describeTopics(topics).all().get();
+                topicDescriptions.forEach((topic, description) -> {
+                    description.partitions().forEach(partitionInfo -> {
+                        if (!brokerId.isPresent() || 
hasReplica(brokerId.get(), partitionInfo)) {
+                            topicPartitions.add(new TopicPartition(topic, 
partitionInfo.partition()));
+                        }
+                    });
+                });
+            } catch (ExecutionException e) {
+                printErrorAndExit("Failed to describe " + topics.size() + " 
topics", e.getCause());
+            }
+        }
+
+        private boolean hasReplica(
+            int brokerId,
+            TopicPartitionInfo partitionInfo
+        ) {
+            return partitionInfo.replicas().stream().anyMatch(node -> 
node.id() == brokerId);
+        }
+
+        private List<OpenTransaction> collectCandidateOpenTransactions(
+            Admin admin,
+            Optional<Integer> brokerId,
+            long maxTransactionTimeoutMs,
+            List<TopicPartition> topicPartitions
+        ) throws Exception {
+            // We have to check all partitions on the broker. In order to avoid
+            // overwhelming it with a giant request, we break the requests into
+            // smaller batches.
+
+            List<OpenTransaction> candidateTransactions = new ArrayList<>();
+
+            consumeInBatches(topicPartitions, MAX_BATCH_SIZE, batch -> {
+                collectCandidateOpenTransactions(
+                    admin,
+                    brokerId,
+                    maxTransactionTimeoutMs,
+                    batch,
+                    candidateTransactions
+                );
+            });
+
+            return candidateTransactions;
+        }
+
+        private static class OpenTransaction {
+            private final TopicPartition topicPartition;
+            private final ProducerState producerState;
+
+            private OpenTransaction(
+                TopicPartition topicPartition,
+                ProducerState producerState
+            ) {
+                this.topicPartition = topicPartition;
+                this.producerState = producerState;
+            }
+        }
+
+        private void collectCandidateOpenTransactions(
+            Admin admin,
+            Optional<Integer> brokerId,
+            long maxTransactionTimeoutMs,
+            List<TopicPartition> topicPartitions,
+            List<OpenTransaction> candidateTransactions
+        ) throws Exception {
+            try {
+                DescribeProducersOptions describeOptions = new 
DescribeProducersOptions();
+                brokerId.ifPresent(describeOptions::brokerId);
+
+                Map<TopicPartition, 
DescribeProducersResult.PartitionProducerState> producersByPartition =
+                    admin.describeProducers(topicPartitions, 
describeOptions).all().get();
+
+                long currentTimeMs = time.milliseconds();
+
+                producersByPartition.forEach((topicPartition, producersStates) 
-> {
+                    producersStates.activeProducers().forEach(activeProducer 
-> {
+                        if 
(activeProducer.currentTransactionStartOffset().isPresent()) {
+                            long transactionDurationMs = currentTimeMs - 
activeProducer.lastTimestamp();
+                            if (transactionDurationMs > 
maxTransactionTimeoutMs) {
+                                candidateTransactions.add(new OpenTransaction(
+                                    topicPartition,
+                                    activeProducer
+                                ));
+                            }
+                        }
+                    });
+                });
+            } catch (ExecutionException e) {
+                printErrorAndExit("Failed to describe producers for " + 
topicPartitions.size() +
+                    " partitions on broker " + brokerId, e.getCause());
+            }
+        }
+
+        private Map<Long, String> lookupTransactionalIds(
+            Admin admin,
+            Set<Long> producerIds
+        ) throws Exception {
+            try {
+                ListTransactionsOptions listTransactionsOptions = new 
ListTransactionsOptions()
+                    .filterProducerIds(producerIds);
+
+                Collection<TransactionListing> transactionListings =
+                    
admin.listTransactions(listTransactionsOptions).all().get();
+
+                Map<Long, String> transactionalIdMap = new HashMap<>();
+
+                transactionListings.forEach(listing -> {
+                    if (!producerIds.contains(listing.producerId())) {
+                        log.debug("Received transaction listing {} which has a 
producerId " +
+                            "which was not requested", listing);
+                    } else {
+                        transactionalIdMap.put(
+                            listing.producerId(),
+                            listing.transactionalId()
+                        );
+                    }
+                });
+
+                return transactionalIdMap;
+            } catch (ExecutionException e) {
+                printErrorAndExit("Failed to list transactions for " + 
producerIds.size() +
+                    " producers", e.getCause());
+                return Collections.emptyMap();
+            }
+        }
+
+        @FunctionalInterface
+        private interface ThrowableConsumer<T> {
+            void accept(T t) throws Exception;
+        }
+
+        private <T> void consumeInBatches(
+            List<T> list,
+            int batchSize,
+            ThrowableConsumer<List<T>> consumer
+        ) throws Exception {
+            int batchStartIndex = 0;
+            int limitIndex = list.size();
+
+            while (batchStartIndex < limitIndex) {
+                int batchEndIndex = Math.min(
+                    limitIndex,
+                    batchStartIndex + batchSize
+                );
+
+                consumer.accept(list.subList(batchStartIndex, batchEndIndex));
+                batchStartIndex = batchEndIndex;
+            }
+        }
+    }
+
     private static void appendColumnValue(
         StringBuilder rowBuilder,
         String value,
@@ -580,7 +1020,8 @@ public abstract class TransactionsCommand {
             new ListTransactionsCommand(time),
             new DescribeTransactionsCommand(time),
             new DescribeProducersCommand(time),
-            new AbortTransactionCommand(time)
+            new AbortTransactionCommand(time),
+            new FindHangingTransactionsCommand(time)
         );
 
         ArgumentParser parser = buildBaseParser();
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java 
b/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java
index b5d7b93..968c34a 100644
--- a/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java
@@ -22,14 +22,21 @@ import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.DescribeProducersOptions;
 import org.apache.kafka.clients.admin.DescribeProducersResult;
 import 
org.apache.kafka.clients.admin.DescribeProducersResult.PartitionProducerState;
+import org.apache.kafka.clients.admin.DescribeTopicsResult;
 import org.apache.kafka.clients.admin.DescribeTransactionsResult;
+import org.apache.kafka.clients.admin.ListTopicsResult;
+import org.apache.kafka.clients.admin.ListTransactionsOptions;
 import org.apache.kafka.clients.admin.ListTransactionsResult;
 import org.apache.kafka.clients.admin.ProducerState;
+import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.clients.admin.TransactionDescription;
 import org.apache.kafka.clients.admin.TransactionListing;
 import org.apache.kafka.clients.admin.TransactionState;
 import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.errors.TransactionalIdNotFoundException;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.MockTime;
@@ -48,7 +55,9 @@ import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.PrintStream;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -56,10 +65,14 @@ import java.util.Map;
 import java.util.OptionalInt;
 import java.util.OptionalLong;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import static java.util.Arrays.asList;
+import static java.util.Collections.emptyMap;
 import static java.util.Collections.singleton;
 import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static org.apache.kafka.common.KafkaFuture.completedFuture;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -145,7 +158,7 @@ public class TransactionsCommandTest {
         DescribeProducersOptions expectedOptions
     ) throws Exception {
         DescribeProducersResult describeResult = 
Mockito.mock(DescribeProducersResult.class);
-        KafkaFuture<PartitionProducerState> describeFuture = 
KafkaFutureImpl.completedFuture(
+        KafkaFuture<PartitionProducerState> describeFuture = completedFuture(
             new PartitionProducerState(asList(
                 new ProducerState(12345L, 15, 1300, 1599509565L,
                     OptionalInt.of(20), OptionalLong.of(990)),
@@ -181,8 +194,6 @@ public class TransactionsCommandTest {
             "list"
         };
 
-        ListTransactionsResult listResult = 
Mockito.mock(ListTransactionsResult.class);
-
         Map<Integer, Collection<TransactionListing>> transactions = new 
HashMap<>();
         transactions.put(0, asList(
             new TransactionListing("foo", 12345L, TransactionState.ONGOING),
@@ -192,11 +203,7 @@ public class TransactionsCommandTest {
             new TransactionListing("baz", 13579L, 
TransactionState.COMPLETE_COMMIT)
         ));
 
-        KafkaFuture<Map<Integer, Collection<TransactionListing>>> 
listTransactionsFuture =
-            KafkaFutureImpl.completedFuture(transactions);
-
-        Mockito.when(admin.listTransactions()).thenReturn(listResult);
-        
Mockito.when(listResult.allByBrokerId()).thenReturn(listTransactionsFuture);
+        expectListTransactions(transactions);
 
         execute(args);
         assertNormalExit();
@@ -241,7 +248,7 @@ public class TransactionsCommandTest {
         int coordinatorId = 5;
         long transactionStartTime = time.milliseconds();
 
-        KafkaFuture<TransactionDescription> describeFuture = 
KafkaFutureImpl.completedFuture(
+        KafkaFuture<TransactionDescription> describeFuture = completedFuture(
             new TransactionDescription(
                 coordinatorId,
                 TransactionState.ONGOING,
@@ -373,14 +380,14 @@ public class TransactionsCommandTest {
         };
 
         DescribeProducersResult describeResult = 
Mockito.mock(DescribeProducersResult.class);
-        KafkaFuture<PartitionProducerState> describeFuture = 
KafkaFutureImpl.completedFuture(
+        KafkaFuture<PartitionProducerState> describeFuture = completedFuture(
             new PartitionProducerState(singletonList(
                 new ProducerState(producerId, producerEpoch, 1300, 1599509565L,
                     OptionalInt.of(coordinatorEpoch), 
OptionalLong.of(startOffset))
             )));
 
         AbortTransactionResult abortTransactionResult = 
Mockito.mock(AbortTransactionResult.class);
-        KafkaFuture<Void> abortFuture = KafkaFutureImpl.completedFuture(null);
+        KafkaFuture<Void> abortFuture = completedFuture(null);
         AbortTransactionSpec expectedAbortSpec = new AbortTransactionSpec(
             topicPartition, producerId, producerEpoch, coordinatorEpoch);
 
@@ -418,7 +425,7 @@ public class TransactionsCommandTest {
         };
 
         AbortTransactionResult abortTransactionResult = 
Mockito.mock(AbortTransactionResult.class);
-        KafkaFuture<Void> abortFuture = KafkaFutureImpl.completedFuture(null);
+        KafkaFuture<Void> abortFuture = completedFuture(null);
 
         final int expectedCoordinatorEpoch;
         if (coordinatorEpoch < 0) {
@@ -437,6 +444,579 @@ public class TransactionsCommandTest {
         assertNormalExit();
     }
 
+    @Test
+    public void testFindHangingRequiresEitherBrokerIdOrTopic() throws 
Exception {
+        assertCommandFailure(new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging"
+        });
+    }
+
+    @Test
+    public void testFindHangingRequiresTopicIfPartitionIsSpecified() throws 
Exception {
+        assertCommandFailure(new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging",
+            "--broker-id",
+            "0",
+            "--partition",
+            "5"
+        });
+    }
+
+    private void expectListTransactions(
+        Map<Integer, Collection<TransactionListing>> listingsByBroker
+    ) {
+        expectListTransactions(new ListTransactionsOptions(), 
listingsByBroker);
+    }
+
+    private void expectListTransactions(
+        ListTransactionsOptions options,
+        Map<Integer, Collection<TransactionListing>> listingsByBroker
+    ) {
+        ListTransactionsResult listResult = 
Mockito.mock(ListTransactionsResult.class);
+        Mockito.when(admin.listTransactions(options)).thenReturn(listResult);
+
+        List<TransactionListing> allListings = new ArrayList<>();
+        listingsByBroker.values().forEach(allListings::addAll);
+
+        
Mockito.when(listResult.all()).thenReturn(completedFuture(allListings));
+        
Mockito.when(listResult.allByBrokerId()).thenReturn(completedFuture(listingsByBroker));
+    }
+
+    private void expectDescribeProducers(
+        TopicPartition topicPartition,
+        long producerId,
+        short producerEpoch,
+        long lastTimestamp,
+        OptionalInt coordinatorEpoch,
+        OptionalLong txnStartOffset
+    ) {
+        PartitionProducerState partitionProducerState = new 
PartitionProducerState(singletonList(
+            new ProducerState(
+                producerId,
+                producerEpoch,
+                500,
+                lastTimestamp,
+                coordinatorEpoch,
+                txnStartOffset
+            )
+        ));
+
+        DescribeProducersResult result = 
Mockito.mock(DescribeProducersResult.class);
+        Mockito.when(result.all()).thenReturn(
+            completedFuture(singletonMap(topicPartition, 
partitionProducerState))
+        );
+
+        Mockito.when(admin.describeProducers(
+            Collections.singletonList(topicPartition),
+            new DescribeProducersOptions()
+        )).thenReturn(result);
+    }
+
+    private void expectDescribeTransactions(
+        Map<String, TransactionDescription> descriptions
+    ) {
+        DescribeTransactionsResult result = 
Mockito.mock(DescribeTransactionsResult.class);
+        descriptions.forEach((transactionalId, description) -> {
+            Mockito.when(result.description(transactionalId))
+                .thenReturn(completedFuture(description));
+        });
+        Mockito.when(result.all()).thenReturn(completedFuture(descriptions));
+        
Mockito.when(admin.describeTransactions(descriptions.keySet())).thenReturn(result);
+    }
+
+    private void expectListTopics(
+        Set<String> topics
+    ) {
+        ListTopicsResult result = Mockito.mock(ListTopicsResult.class);
+        Mockito.when(result.names()).thenReturn(completedFuture(topics));
+        Mockito.when(admin.listTopics()).thenReturn(result);
+    }
+
+    private void expectDescribeTopics(
+        Map<String, TopicDescription> descriptions
+    ) {
+        DescribeTopicsResult result = Mockito.mock(DescribeTopicsResult.class);
+        Mockito.when(result.all()).thenReturn(completedFuture(descriptions));
+        Mockito.when(admin.describeTopics(new 
ArrayList<>(descriptions.keySet()))).thenReturn(result);
+    }
+
+    @Test
+    public void testFindHangingLookupTopicPartitionsForBroker() throws 
Exception {
+        int brokerId = 5;
+
+        String[] args = new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging",
+            "--broker-id",
+            String.valueOf(brokerId)
+        };
+
+        String topic = "foo";
+        expectListTopics(singleton(topic));
+
+        Node node0 = new Node(0, "localhost", 9092);
+        Node node1 = new Node(1, "localhost", 9093);
+        Node node5 = new Node(5, "localhost", 9097);
+
+        TopicPartitionInfo partition0 = new TopicPartitionInfo(
+            0,
+            node0,
+            Arrays.asList(node0, node1),
+            Arrays.asList(node0, node1)
+        );
+        TopicPartitionInfo partition1 = new TopicPartitionInfo(
+            1,
+            node1,
+            Arrays.asList(node1, node5),
+            Arrays.asList(node1, node5)
+        );
+
+        TopicDescription description = new TopicDescription(
+            topic,
+            false,
+            Arrays.asList(partition0, partition1)
+        );
+        expectDescribeTopics(singletonMap(topic, description));
+
+        DescribeProducersResult result = 
Mockito.mock(DescribeProducersResult.class);
+        Mockito.when(result.all()).thenReturn(completedFuture(emptyMap()));
+
+        Mockito.when(admin.describeProducers(
+            Collections.singletonList(new TopicPartition(topic, 1)),
+            new DescribeProducersOptions().brokerId(brokerId)
+        )).thenReturn(result);
+
+        execute(args);
+        assertNormalExit();
+        assertNoHangingTransactions();
+    }
+
+    @Test
+    public void testFindHangingLookupTopicAndBrokerId() throws Exception {
+        int brokerId = 5;
+        String topic = "foo";
+
+        String[] args = new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging",
+            "--broker-id",
+            String.valueOf(brokerId),
+            "--topic",
+            topic
+        };
+
+        Node node0 = new Node(0, "localhost", 9092);
+        Node node1 = new Node(1, "localhost", 9093);
+        Node node5 = new Node(5, "localhost", 9097);
+
+        TopicPartitionInfo partition0 = new TopicPartitionInfo(
+            0,
+            node0,
+            Arrays.asList(node0, node1),
+            Arrays.asList(node0, node1)
+        );
+        TopicPartitionInfo partition1 = new TopicPartitionInfo(
+            1,
+            node1,
+            Arrays.asList(node1, node5),
+            Arrays.asList(node1, node5)
+        );
+
+        TopicDescription description = new TopicDescription(
+            topic,
+            false,
+            Arrays.asList(partition0, partition1)
+        );
+        expectDescribeTopics(singletonMap(topic, description));
+
+        DescribeProducersResult result = 
Mockito.mock(DescribeProducersResult.class);
+        Mockito.when(result.all()).thenReturn(completedFuture(emptyMap()));
+
+        Mockito.when(admin.describeProducers(
+            Collections.singletonList(new TopicPartition(topic, 1)),
+            new DescribeProducersOptions().brokerId(brokerId)
+        )).thenReturn(result);
+
+        execute(args);
+        assertNormalExit();
+        assertNoHangingTransactions();
+    }
+
+    @Test
+    public void testFindHangingLookupTopicPartitionsForTopic() throws 
Exception {
+        String topic = "foo";
+
+        String[] args = new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging",
+            "--topic",
+            topic
+        };
+
+        Node node0 = new Node(0, "localhost", 9092);
+        Node node1 = new Node(1, "localhost", 9093);
+        Node node5 = new Node(5, "localhost", 9097);
+
+        TopicPartitionInfo partition0 = new TopicPartitionInfo(
+            0,
+            node0,
+            Arrays.asList(node0, node1),
+            Arrays.asList(node0, node1)
+        );
+        TopicPartitionInfo partition1 = new TopicPartitionInfo(
+            1,
+            node1,
+            Arrays.asList(node1, node5),
+            Arrays.asList(node1, node5)
+        );
+
+        TopicDescription description = new TopicDescription(
+            topic,
+            false,
+            Arrays.asList(partition0, partition1)
+        );
+        expectDescribeTopics(singletonMap(topic, description));
+
+        DescribeProducersResult result = 
Mockito.mock(DescribeProducersResult.class);
+        Mockito.when(result.all()).thenReturn(completedFuture(emptyMap()));
+
+        Mockito.when(admin.describeProducers(
+            Arrays.asList(new TopicPartition(topic, 0), new 
TopicPartition(topic, 1)),
+            new DescribeProducersOptions()
+        )).thenReturn(result);
+
+        execute(args);
+        assertNormalExit();
+        assertNoHangingTransactions();
+    }
+
+    private void assertNoHangingTransactions() throws Exception {
+        List<List<String>> table = readOutputAsTable();
+        assertEquals(1, table.size());
+
+        List<String> expectedHeaders = 
asList(TransactionsCommand.FindHangingTransactionsCommand.HEADERS);
+        assertEquals(expectedHeaders, table.get(0));
+    }
+
+    @Test
+    public void testFindHangingSpecifiedTopicPartition() throws Exception {
+        TopicPartition topicPartition = new TopicPartition("foo", 5);
+
+        String[] args = new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging",
+            "--topic",
+            topicPartition.topic(),
+            "--partition",
+            String.valueOf(topicPartition.partition())
+        };
+
+        long producerId = 132L;
+        short producerEpoch = 5;
+        long lastTimestamp = time.milliseconds();
+        OptionalInt coordinatorEpoch = OptionalInt.of(19);
+        OptionalLong txnStartOffset = OptionalLong.of(29384L);
+
+        expectDescribeProducers(
+            topicPartition,
+            producerId,
+            producerEpoch,
+            lastTimestamp,
+            coordinatorEpoch,
+            txnStartOffset
+        );
+
+        execute(args);
+        assertNormalExit();
+
+        List<List<String>> table = readOutputAsTable();
+        assertEquals(1, table.size());
+
+        List<String> expectedHeaders = 
asList(TransactionsCommand.FindHangingTransactionsCommand.HEADERS);
+        assertEquals(expectedHeaders, table.get(0));
+    }
+
+    @Test
+    public void testFindHangingNoMappedTransactionalId() throws Exception {
+        TopicPartition topicPartition = new TopicPartition("foo", 5);
+
+        String[] args = new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging",
+            "--topic",
+            topicPartition.topic(),
+            "--partition",
+            String.valueOf(topicPartition.partition())
+        };
+
+        long producerId = 132L;
+        short producerEpoch = 5;
+        long lastTimestamp = time.milliseconds() - 
TimeUnit.MINUTES.toMillis(60);
+        int coordinatorEpoch = 19;
+        long txnStartOffset = 29384L;
+
+        expectDescribeProducers(
+            topicPartition,
+            producerId,
+            producerEpoch,
+            lastTimestamp,
+            OptionalInt.of(coordinatorEpoch),
+            OptionalLong.of(txnStartOffset)
+        );
+
+        expectListTransactions(
+            new 
ListTransactionsOptions().filterProducerIds(singleton(producerId)),
+            singletonMap(1, Collections.emptyList())
+        );
+
+        expectDescribeTransactions(Collections.emptyMap());
+
+        execute(args);
+        assertNormalExit();
+
+        assertHangingTransaction(
+            topicPartition,
+            producerId,
+            producerEpoch,
+            coordinatorEpoch,
+            txnStartOffset,
+            lastTimestamp
+        );
+    }
+
+    @Test
+    public void testFindHangingWithNoTransactionDescription() throws Exception 
{
+        TopicPartition topicPartition = new TopicPartition("foo", 5);
+
+        String[] args = new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging",
+            "--topic",
+            topicPartition.topic(),
+            "--partition",
+            String.valueOf(topicPartition.partition())
+        };
+
+        long producerId = 132L;
+        short producerEpoch = 5;
+        long lastTimestamp = time.milliseconds() - 
TimeUnit.MINUTES.toMillis(60);
+        int coordinatorEpoch = 19;
+        long txnStartOffset = 29384L;
+
+        expectDescribeProducers(
+            topicPartition,
+            producerId,
+            producerEpoch,
+            lastTimestamp,
+            OptionalInt.of(coordinatorEpoch),
+            OptionalLong.of(txnStartOffset)
+        );
+
+        String transactionalId = "bar";
+        TransactionListing listing = new TransactionListing(
+            transactionalId,
+            producerId,
+            TransactionState.ONGOING
+        );
+
+        expectListTransactions(
+            new 
ListTransactionsOptions().filterProducerIds(singleton(producerId)),
+            singletonMap(1, Collections.singletonList(listing))
+        );
+
+        DescribeTransactionsResult result = 
Mockito.mock(DescribeTransactionsResult.class);
+        Mockito.when(result.description(transactionalId))
+            .thenReturn(failedFuture(new 
TransactionalIdNotFoundException(transactionalId + " not found")));
+        
Mockito.when(admin.describeTransactions(singleton(transactionalId))).thenReturn(result);
+
+        execute(args);
+        assertNormalExit();
+
+        assertHangingTransaction(
+            topicPartition,
+            producerId,
+            producerEpoch,
+            coordinatorEpoch,
+            txnStartOffset,
+            lastTimestamp
+        );
+    }
+
+    private <T> KafkaFuture<T> failedFuture(Exception e) {
+        KafkaFutureImpl<T> future = new KafkaFutureImpl<>();
+        future.completeExceptionally(e);
+        return future;
+    }
+
+    @Test
+    public void 
testFindHangingDoesNotFilterByTransactionInProgressWithDifferentPartitions() 
throws Exception {
+        TopicPartition topicPartition = new TopicPartition("foo", 5);
+
+        String[] args = new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging",
+            "--topic",
+            topicPartition.topic(),
+            "--partition",
+            String.valueOf(topicPartition.partition())
+        };
+
+        long producerId = 132L;
+        short producerEpoch = 5;
+        long lastTimestamp = time.milliseconds() - 
TimeUnit.MINUTES.toMillis(60);
+        int coordinatorEpoch = 19;
+        long txnStartOffset = 29384L;
+
+        expectDescribeProducers(
+            topicPartition,
+            producerId,
+            producerEpoch,
+            lastTimestamp,
+            OptionalInt.of(coordinatorEpoch),
+            OptionalLong.of(txnStartOffset)
+        );
+
+        String transactionalId = "bar";
+        TransactionListing listing = new TransactionListing(
+            transactionalId,
+            producerId,
+            TransactionState.ONGOING
+        );
+
+        expectListTransactions(
+            new 
ListTransactionsOptions().filterProducerIds(singleton(producerId)),
+            singletonMap(1, Collections.singletonList(listing))
+        );
+
+        // Although there is a transaction in progress from the same
+        // producer epoch, it does not include the topic partition we
+        // found when describing producers.
+        TransactionDescription description = new TransactionDescription(
+            1,
+            TransactionState.ONGOING,
+            producerId,
+            producerEpoch,
+            60000,
+            OptionalLong.of(time.milliseconds()),
+            singleton(new TopicPartition("foo", 10))
+        );
+
+        expectDescribeTransactions(singletonMap(transactionalId, description));
+
+        execute(args);
+        assertNormalExit();
+
+        assertHangingTransaction(
+            topicPartition,
+            producerId,
+            producerEpoch,
+            coordinatorEpoch,
+            txnStartOffset,
+            lastTimestamp
+        );
+    }
+
+    private void assertHangingTransaction(
+        TopicPartition topicPartition,
+        long producerId,
+        short producerEpoch,
+        int coordinatorEpoch,
+        long txnStartOffset,
+        long lastTimestamp
+    ) throws Exception {
+        List<List<String>> table = readOutputAsTable();
+        assertEquals(2, table.size());
+
+        List<String> expectedHeaders = 
asList(TransactionsCommand.FindHangingTransactionsCommand.HEADERS);
+        assertEquals(expectedHeaders, table.get(0));
+
+        long durationMinutes = 
TimeUnit.MILLISECONDS.toMinutes(time.milliseconds() - lastTimestamp);
+
+        List<String> expectedRow = asList(
+            topicPartition.topic(),
+            String.valueOf(topicPartition.partition()),
+            String.valueOf(producerId),
+            String.valueOf(producerEpoch),
+            String.valueOf(coordinatorEpoch),
+            String.valueOf(txnStartOffset),
+            String.valueOf(lastTimestamp),
+            String.valueOf(durationMinutes)
+        );
+        assertEquals(expectedRow, table.get(1));
+    }
+
+    @Test
+    public void 
testFindHangingFilterByTransactionInProgressWithSamePartition() throws 
Exception {
+        TopicPartition topicPartition = new TopicPartition("foo", 5);
+
+        String[] args = new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging",
+            "--topic",
+            topicPartition.topic(),
+            "--partition",
+            String.valueOf(topicPartition.partition())
+        };
+
+        long producerId = 132L;
+        short producerEpoch = 5;
+        long lastTimestamp = time.milliseconds() - 
TimeUnit.MINUTES.toMillis(60);
+        int coordinatorEpoch = 19;
+        long txnStartOffset = 29384L;
+
+        expectDescribeProducers(
+            topicPartition,
+            producerId,
+            producerEpoch,
+            lastTimestamp,
+            OptionalInt.of(coordinatorEpoch),
+            OptionalLong.of(txnStartOffset)
+        );
+
+        String transactionalId = "bar";
+        TransactionListing listing = new TransactionListing(
+            transactionalId,
+            producerId,
+            TransactionState.ONGOING
+        );
+
+        expectListTransactions(
+            new 
ListTransactionsOptions().filterProducerIds(singleton(producerId)),
+            singletonMap(1, Collections.singletonList(listing))
+        );
+
+        // The coordinator shows an active transaction with the same epoch
+        // which includes the partition, so no hanging transaction should
+        // be detected.
+        TransactionDescription description = new TransactionDescription(
+            1,
+            TransactionState.ONGOING,
+            producerId,
+            producerEpoch,
+            60000,
+            OptionalLong.of(lastTimestamp),
+            singleton(topicPartition)
+        );
+
+        expectDescribeTransactions(singletonMap(transactionalId, description));
+
+        execute(args);
+        assertNormalExit();
+        assertNoHangingTransactions();
+    }
+
     private void execute(String[] args) throws Exception {
         TransactionsCommand.execute(args, ns -> admin, out, time);
     }

Reply via email to