This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f29c43b KAFKA-12979; Implement command to find hanging transactions
(#10974)
f29c43b is described below
commit f29c43bdbb85fd7c97ad2f567dcd67f2f2ece8ef
Author: Jason Gustafson <[email protected]>
AuthorDate: Tue Jul 6 10:39:59 2021 -0700
KAFKA-12979; Implement command to find hanging transactions (#10974)
This patch implements the `find-hanging` command described in KIP-664:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-664%3A+Provide+tooling+to+detect+and+abort+hanging+transactions#KIP664:Providetoolingtodetectandaborthangingtransactions-FindingHangingTransactions.
Reviewers: Luke Chen <[email protected]>, David Jacot <[email protected]>
---
.../clients/admin/ListTransactionsOptions.java | 15 +
.../apache/kafka/tools/TransactionsCommand.java | 445 ++++++++++++++-
.../kafka/tools/TransactionsCommandTest.java | 604 ++++++++++++++++++++-
3 files changed, 1050 insertions(+), 14 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java
b/clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java
index 3b2d902..c23d444 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
+import java.util.Objects;
import java.util.Set;
/**
@@ -88,4 +89,18 @@ public class ListTransactionsOptions extends
AbstractOptions<ListTransactionsOpt
", timeoutMs=" + timeoutMs +
')';
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ListTransactionsOptions that = (ListTransactionsOptions) o;
+ return Objects.equals(filteredStates, that.filteredStates) &&
+ Objects.equals(filteredProducerIds, that.filteredProducerIds);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(filteredStates, filteredProducerIds);
+ }
}
diff --git
a/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
b/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
index 2419460..b8f1c9d 100644
--- a/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
@@ -28,10 +28,15 @@ import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.DescribeProducersOptions;
import org.apache.kafka.clients.admin.DescribeProducersResult;
+import org.apache.kafka.clients.admin.DescribeTransactionsResult;
+import org.apache.kafka.clients.admin.ListTransactionsOptions;
import org.apache.kafka.clients.admin.ProducerState;
+import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.admin.TransactionDescription;
import org.apache.kafka.clients.admin.TransactionListing;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.errors.TransactionalIdNotFoundException;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
@@ -43,12 +48,17 @@ import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -434,7 +444,7 @@ public abstract class TransactionsCommand {
final Map<Integer, Collection<TransactionListing>> result;
try {
- result = admin.listTransactions()
+ result = admin.listTransactions(new ListTransactionsOptions())
.allByBrokerId()
.get();
} catch (ExecutionException e) {
@@ -461,6 +471,436 @@ public abstract class TransactionsCommand {
}
}
+ static class FindHangingTransactionsCommand extends TransactionsCommand {
+ private static final int MAX_BATCH_SIZE = 500;
+
+ static final String[] HEADERS = new String[] {
+ "Topic",
+ "Partition",
+ "ProducerId",
+ "ProducerEpoch",
+ "CoordinatorEpoch",
+ "StartOffset",
+ "LastTimestamp",
+ "Duration(min)"
+ };
+
+ FindHangingTransactionsCommand(Time time) {
+ super(time);
+ }
+
+ @Override
+ String name() {
+ return "find-hanging";
+ }
+
+ @Override
+ void addSubparser(Subparsers subparsers) {
+ Subparser subparser = subparsers.addParser(name())
+ .help("find hanging transactions");
+
+ subparser.addArgument("--broker-id")
+ .help("broker id to search for hanging transactions")
+ .action(store())
+ .type(Integer.class)
+ .required(false);
+
+ subparser.addArgument("--max-transaction-timeout")
+ .help("maximum transaction timeout in minutes to limit the
scope of the search (15 minutes by default)")
+ .action(store())
+ .type(Integer.class)
+ .setDefault(15)
+ .required(false);
+
+ subparser.addArgument("--topic")
+ .help("topic name to limit search to (required if --partition
is specified)")
+ .action(store())
+ .type(String.class)
+ .required(false);
+
+ subparser.addArgument("--partition")
+ .help("partition number")
+ .action(store())
+ .type(Integer.class)
+ .required(false);
+ }
+
+ @Override
+ void execute(Admin admin, Namespace ns, PrintStream out) throws
Exception {
+ Optional<Integer> brokerId =
Optional.ofNullable(ns.getInt("broker_id"));
+ Optional<String> topic =
Optional.ofNullable(ns.getString("topic"));
+
+ if (!topic.isPresent() && !brokerId.isPresent()) {
+ printErrorAndExit("The `find-hanging` command requires either
--topic " +
+ "or --broker-id to limit the scope of the search");
+ return;
+ }
+
+ Optional<Integer> partition =
Optional.ofNullable(ns.getInt("partition"));
+ if (partition.isPresent() && !topic.isPresent()) {
+ printErrorAndExit("The --partition argument requires --topic
to be provided");
+ return;
+ }
+
+ long maxTransactionTimeoutMs = TimeUnit.MINUTES.toMillis(
+ ns.getInt("max_transaction_timeout"));
+
+ List<TopicPartition> topicPartitions =
collectTopicPartitionsToSearch(
+ admin,
+ topic,
+ partition,
+ brokerId
+ );
+
+ List<OpenTransaction> candidates =
collectCandidateOpenTransactions(
+ admin,
+ brokerId,
+ maxTransactionTimeoutMs,
+ topicPartitions
+ );
+
+ if (candidates.isEmpty()) {
+ printHangingTransactions(Collections.emptyList(), out);
+ } else {
+ Map<Long, List<OpenTransaction>> openTransactionsByProducerId
= groupByProducerId(candidates);
+
+ Map<Long, String> transactionalIds = lookupTransactionalIds(
+ admin,
+ openTransactionsByProducerId.keySet()
+ );
+
+ Map<String, TransactionDescription> descriptions =
describeTransactions(
+ admin,
+ transactionalIds.values()
+ );
+
+ List<OpenTransaction> hangingTransactions =
filterHangingTransactions(
+ openTransactionsByProducerId,
+ transactionalIds,
+ descriptions
+ );
+
+ printHangingTransactions(hangingTransactions, out);
+ }
+ }
+
+ private List<TopicPartition> collectTopicPartitionsToSearch(
+ Admin admin,
+ Optional<String> topic,
+ Optional<Integer> partition,
+ Optional<Integer> brokerId
+ ) throws Exception {
+ final List<String> topics;
+
+ if (topic.isPresent()) {
+ if (partition.isPresent()) {
+ return Collections.singletonList(new
TopicPartition(topic.get(), partition.get()));
+ } else {
+ topics = Collections.singletonList(topic.get());
+ }
+ } else {
+ topics = listTopics(admin);
+ }
+
+ return findTopicPartitions(
+ admin,
+ brokerId,
+ topics
+ );
+ }
+
+ private List<OpenTransaction> filterHangingTransactions(
+ Map<Long, List<OpenTransaction>> openTransactionsByProducerId,
+ Map<Long, String> transactionalIds,
+ Map<String, TransactionDescription> descriptions
+ ) {
+ List<OpenTransaction> hangingTransactions = new ArrayList<>();
+
+ openTransactionsByProducerId.forEach((producerId,
openTransactions) -> {
+ String transactionalId = transactionalIds.get(producerId);
+ if (transactionalId == null) {
+ // If we could not find the transactionalId corresponding
to the
+ // producerId of an open transaction, then the transaction
is hanging.
+ hangingTransactions.addAll(openTransactions);
+ } else {
+ // Otherwise, we need to check the current transaction
state.
+ TransactionDescription description =
descriptions.get(transactionalId);
+ if (description == null) {
+ hangingTransactions.addAll(openTransactions);
+ } else {
+ for (OpenTransaction openTransaction :
openTransactions) {
+ // The `DescribeTransactions` API returns all
partitions being
+ // written to in an ongoing transaction and any
partition which
+ // does not yet have markers written when in the
`PendingAbort` or
+ // `PendingCommit` states. If the topic partition
that we found is
+ // among these, then we can still expect the
coordinator to write
+ // the marker. Otherwise, it is a hanging
transaction.
+ if
(!description.topicPartitions().contains(openTransaction.topicPartition)) {
+ hangingTransactions.add(openTransaction);
+ }
+ }
+ }
+ }
+ });
+
+ return hangingTransactions;
+ }
+
+ private void printHangingTransactions(
+ List<OpenTransaction> hangingTransactions,
+ PrintStream out
+ ) {
+ long currentTimeMs = time.milliseconds();
+ List<String[]> rows = new ArrayList<>(hangingTransactions.size());
+
+ for (OpenTransaction transaction : hangingTransactions) {
+ long transactionDurationMinutes =
TimeUnit.MILLISECONDS.toMinutes(
+ currentTimeMs - transaction.producerState.lastTimestamp());
+
+ rows.add(new String[] {
+ transaction.topicPartition.topic(),
+ String.valueOf(transaction.topicPartition.partition()),
+ String.valueOf(transaction.producerState.producerId()),
+ String.valueOf(transaction.producerState.producerEpoch()),
+
String.valueOf(transaction.producerState.coordinatorEpoch().orElse(-1)),
+
String.valueOf(transaction.producerState.currentTransactionStartOffset().orElse(-1)),
+ String.valueOf(transaction.producerState.lastTimestamp()),
+ String.valueOf(transactionDurationMinutes)
+ });
+ }
+
+ prettyPrintTable(HEADERS, rows, out);
+ }
+
+ private Map<String, TransactionDescription> describeTransactions(
+ Admin admin,
+ Collection<String> transactionalIds
+ ) throws Exception {
+ try {
+ DescribeTransactionsResult result =
admin.describeTransactions(new HashSet<>(transactionalIds));
+ Map<String, TransactionDescription> descriptions = new
HashMap<>();
+
+ for (String transactionalId : transactionalIds) {
+ try {
+ TransactionDescription description =
result.description(transactionalId).get();
+ descriptions.put(transactionalId, description);
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof
TransactionalIdNotFoundException) {
+ descriptions.put(transactionalId, null);
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ return descriptions;
+ } catch (ExecutionException e) {
+ printErrorAndExit("Failed to describe " +
transactionalIds.size()
+ + " transactions", e.getCause());
+ return Collections.emptyMap();
+ }
+ }
+
+ private Map<Long, List<OpenTransaction>> groupByProducerId(
+ List<OpenTransaction> openTransactions
+ ) {
+ Map<Long, List<OpenTransaction>> res = new HashMap<>();
+ for (OpenTransaction transaction : openTransactions) {
+ List<OpenTransaction> states = res.computeIfAbsent(
+ transaction.producerState.producerId(),
+ __ -> new ArrayList<>()
+ );
+ states.add(transaction);
+ }
+ return res;
+ }
+
+ private List<String> listTopics(
+ Admin admin
+ ) throws Exception {
+ try {
+ return new ArrayList<>(admin.listTopics().names().get());
+ } catch (ExecutionException e) {
+ printErrorAndExit("Failed to list topics", e.getCause());
+ return Collections.emptyList();
+ }
+ }
+
+ private List<TopicPartition> findTopicPartitions(
+ Admin admin,
+ Optional<Integer> brokerId,
+ List<String> topics
+ ) throws Exception {
+ List<TopicPartition> topicPartitions = new ArrayList<>();
+ consumeInBatches(topics, MAX_BATCH_SIZE, batch -> {
+ findTopicPartitions(
+ admin,
+ brokerId,
+ batch,
+ topicPartitions
+ );
+ });
+ return topicPartitions;
+ }
+
+ private void findTopicPartitions(
+ Admin admin,
+ Optional<Integer> brokerId,
+ List<String> topics,
+ List<TopicPartition> topicPartitions
+ ) throws Exception {
+ try {
+ Map<String, TopicDescription> topicDescriptions =
admin.describeTopics(topics).all().get();
+ topicDescriptions.forEach((topic, description) -> {
+ description.partitions().forEach(partitionInfo -> {
+ if (!brokerId.isPresent() ||
hasReplica(brokerId.get(), partitionInfo)) {
+ topicPartitions.add(new TopicPartition(topic,
partitionInfo.partition()));
+ }
+ });
+ });
+ } catch (ExecutionException e) {
+ printErrorAndExit("Failed to describe " + topics.size() + "
topics", e.getCause());
+ }
+ }
+
+ private boolean hasReplica(
+ int brokerId,
+ TopicPartitionInfo partitionInfo
+ ) {
+ return partitionInfo.replicas().stream().anyMatch(node ->
node.id() == brokerId);
+ }
+
+ private List<OpenTransaction> collectCandidateOpenTransactions(
+ Admin admin,
+ Optional<Integer> brokerId,
+ long maxTransactionTimeoutMs,
+ List<TopicPartition> topicPartitions
+ ) throws Exception {
+ // We have to check all partitions on the broker. In order to avoid
+ // overwhelming it with a giant request, we break the requests into
+ // smaller batches.
+
+ List<OpenTransaction> candidateTransactions = new ArrayList<>();
+
+ consumeInBatches(topicPartitions, MAX_BATCH_SIZE, batch -> {
+ collectCandidateOpenTransactions(
+ admin,
+ brokerId,
+ maxTransactionTimeoutMs,
+ batch,
+ candidateTransactions
+ );
+ });
+
+ return candidateTransactions;
+ }
+
+ private static class OpenTransaction {
+ private final TopicPartition topicPartition;
+ private final ProducerState producerState;
+
+ private OpenTransaction(
+ TopicPartition topicPartition,
+ ProducerState producerState
+ ) {
+ this.topicPartition = topicPartition;
+ this.producerState = producerState;
+ }
+ }
+
+ private void collectCandidateOpenTransactions(
+ Admin admin,
+ Optional<Integer> brokerId,
+ long maxTransactionTimeoutMs,
+ List<TopicPartition> topicPartitions,
+ List<OpenTransaction> candidateTransactions
+ ) throws Exception {
+ try {
+ DescribeProducersOptions describeOptions = new
DescribeProducersOptions();
+ brokerId.ifPresent(describeOptions::brokerId);
+
+ Map<TopicPartition,
DescribeProducersResult.PartitionProducerState> producersByPartition =
+ admin.describeProducers(topicPartitions,
describeOptions).all().get();
+
+ long currentTimeMs = time.milliseconds();
+
+ producersByPartition.forEach((topicPartition, producersStates)
-> {
+ producersStates.activeProducers().forEach(activeProducer
-> {
+ if
(activeProducer.currentTransactionStartOffset().isPresent()) {
+ long transactionDurationMs = currentTimeMs -
activeProducer.lastTimestamp();
+ if (transactionDurationMs >
maxTransactionTimeoutMs) {
+ candidateTransactions.add(new OpenTransaction(
+ topicPartition,
+ activeProducer
+ ));
+ }
+ }
+ });
+ });
+ } catch (ExecutionException e) {
+ printErrorAndExit("Failed to describe producers for " +
topicPartitions.size() +
+ " partitions on broker " + brokerId, e.getCause());
+ }
+ }
+
+ private Map<Long, String> lookupTransactionalIds(
+ Admin admin,
+ Set<Long> producerIds
+ ) throws Exception {
+ try {
+ ListTransactionsOptions listTransactionsOptions = new
ListTransactionsOptions()
+ .filterProducerIds(producerIds);
+
+ Collection<TransactionListing> transactionListings =
+
admin.listTransactions(listTransactionsOptions).all().get();
+
+ Map<Long, String> transactionalIdMap = new HashMap<>();
+
+ transactionListings.forEach(listing -> {
+ if (!producerIds.contains(listing.producerId())) {
+ log.debug("Received transaction listing {} which has a
producerId " +
+ "which was not requested", listing);
+ } else {
+ transactionalIdMap.put(
+ listing.producerId(),
+ listing.transactionalId()
+ );
+ }
+ });
+
+ return transactionalIdMap;
+ } catch (ExecutionException e) {
+ printErrorAndExit("Failed to list transactions for " +
producerIds.size() +
+ " producers", e.getCause());
+ return Collections.emptyMap();
+ }
+ }
+
+ @FunctionalInterface
+ private interface ThrowableConsumer<T> {
+ void accept(T t) throws Exception;
+ }
+
+ private <T> void consumeInBatches(
+ List<T> list,
+ int batchSize,
+ ThrowableConsumer<List<T>> consumer
+ ) throws Exception {
+ int batchStartIndex = 0;
+ int limitIndex = list.size();
+
+ while (batchStartIndex < limitIndex) {
+ int batchEndIndex = Math.min(
+ limitIndex,
+ batchStartIndex + batchSize
+ );
+
+ consumer.accept(list.subList(batchStartIndex, batchEndIndex));
+ batchStartIndex = batchEndIndex;
+ }
+ }
+ }
+
private static void appendColumnValue(
StringBuilder rowBuilder,
String value,
@@ -580,7 +1020,8 @@ public abstract class TransactionsCommand {
new ListTransactionsCommand(time),
new DescribeTransactionsCommand(time),
new DescribeProducersCommand(time),
- new AbortTransactionCommand(time)
+ new AbortTransactionCommand(time),
+ new FindHangingTransactionsCommand(time)
);
ArgumentParser parser = buildBaseParser();
diff --git
a/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java
index b5d7b93..968c34a 100644
--- a/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java
@@ -22,14 +22,21 @@ import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.DescribeProducersOptions;
import org.apache.kafka.clients.admin.DescribeProducersResult;
import
org.apache.kafka.clients.admin.DescribeProducersResult.PartitionProducerState;
+import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.DescribeTransactionsResult;
+import org.apache.kafka.clients.admin.ListTopicsResult;
+import org.apache.kafka.clients.admin.ListTransactionsOptions;
import org.apache.kafka.clients.admin.ListTransactionsResult;
import org.apache.kafka.clients.admin.ProducerState;
+import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.admin.TransactionDescription;
import org.apache.kafka.clients.admin.TransactionListing;
import org.apache.kafka.clients.admin.TransactionState;
import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.errors.TransactionalIdNotFoundException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.MockTime;
@@ -48,7 +55,9 @@ import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -56,10 +65,14 @@ import java.util.Map;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import static java.util.Arrays.asList;
+import static java.util.Collections.emptyMap;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static org.apache.kafka.common.KafkaFuture.completedFuture;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -145,7 +158,7 @@ public class TransactionsCommandTest {
DescribeProducersOptions expectedOptions
) throws Exception {
DescribeProducersResult describeResult =
Mockito.mock(DescribeProducersResult.class);
- KafkaFuture<PartitionProducerState> describeFuture =
KafkaFutureImpl.completedFuture(
+ KafkaFuture<PartitionProducerState> describeFuture = completedFuture(
new PartitionProducerState(asList(
new ProducerState(12345L, 15, 1300, 1599509565L,
OptionalInt.of(20), OptionalLong.of(990)),
@@ -181,8 +194,6 @@ public class TransactionsCommandTest {
"list"
};
- ListTransactionsResult listResult =
Mockito.mock(ListTransactionsResult.class);
-
Map<Integer, Collection<TransactionListing>> transactions = new
HashMap<>();
transactions.put(0, asList(
new TransactionListing("foo", 12345L, TransactionState.ONGOING),
@@ -192,11 +203,7 @@ public class TransactionsCommandTest {
new TransactionListing("baz", 13579L,
TransactionState.COMPLETE_COMMIT)
));
- KafkaFuture<Map<Integer, Collection<TransactionListing>>>
listTransactionsFuture =
- KafkaFutureImpl.completedFuture(transactions);
-
- Mockito.when(admin.listTransactions()).thenReturn(listResult);
-
Mockito.when(listResult.allByBrokerId()).thenReturn(listTransactionsFuture);
+ expectListTransactions(transactions);
execute(args);
assertNormalExit();
@@ -241,7 +248,7 @@ public class TransactionsCommandTest {
int coordinatorId = 5;
long transactionStartTime = time.milliseconds();
- KafkaFuture<TransactionDescription> describeFuture =
KafkaFutureImpl.completedFuture(
+ KafkaFuture<TransactionDescription> describeFuture = completedFuture(
new TransactionDescription(
coordinatorId,
TransactionState.ONGOING,
@@ -373,14 +380,14 @@ public class TransactionsCommandTest {
};
DescribeProducersResult describeResult =
Mockito.mock(DescribeProducersResult.class);
- KafkaFuture<PartitionProducerState> describeFuture =
KafkaFutureImpl.completedFuture(
+ KafkaFuture<PartitionProducerState> describeFuture = completedFuture(
new PartitionProducerState(singletonList(
new ProducerState(producerId, producerEpoch, 1300, 1599509565L,
OptionalInt.of(coordinatorEpoch),
OptionalLong.of(startOffset))
)));
AbortTransactionResult abortTransactionResult =
Mockito.mock(AbortTransactionResult.class);
- KafkaFuture<Void> abortFuture = KafkaFutureImpl.completedFuture(null);
+ KafkaFuture<Void> abortFuture = completedFuture(null);
AbortTransactionSpec expectedAbortSpec = new AbortTransactionSpec(
topicPartition, producerId, producerEpoch, coordinatorEpoch);
@@ -418,7 +425,7 @@ public class TransactionsCommandTest {
};
AbortTransactionResult abortTransactionResult =
Mockito.mock(AbortTransactionResult.class);
- KafkaFuture<Void> abortFuture = KafkaFutureImpl.completedFuture(null);
+ KafkaFuture<Void> abortFuture = completedFuture(null);
final int expectedCoordinatorEpoch;
if (coordinatorEpoch < 0) {
@@ -437,6 +444,579 @@ public class TransactionsCommandTest {
assertNormalExit();
}
+ @Test
+ public void testFindHangingRequiresEitherBrokerIdOrTopic() throws
Exception {
+ assertCommandFailure(new String[]{
+ "--bootstrap-server",
+ "localhost:9092",
+ "find-hanging"
+ });
+ }
+
+ @Test
+ public void testFindHangingRequiresTopicIfPartitionIsSpecified() throws
Exception {
+ assertCommandFailure(new String[]{
+ "--bootstrap-server",
+ "localhost:9092",
+ "find-hanging",
+ "--broker-id",
+ "0",
+ "--partition",
+ "5"
+ });
+ }
+
+ private void expectListTransactions(
+ Map<Integer, Collection<TransactionListing>> listingsByBroker
+ ) {
+ expectListTransactions(new ListTransactionsOptions(),
listingsByBroker);
+ }
+
+ private void expectListTransactions(
+ ListTransactionsOptions options,
+ Map<Integer, Collection<TransactionListing>> listingsByBroker
+ ) {
+ ListTransactionsResult listResult =
Mockito.mock(ListTransactionsResult.class);
+ Mockito.when(admin.listTransactions(options)).thenReturn(listResult);
+
+ List<TransactionListing> allListings = new ArrayList<>();
+ listingsByBroker.values().forEach(allListings::addAll);
+
+
Mockito.when(listResult.all()).thenReturn(completedFuture(allListings));
+
Mockito.when(listResult.allByBrokerId()).thenReturn(completedFuture(listingsByBroker));
+ }
+
+ private void expectDescribeProducers(
+ TopicPartition topicPartition,
+ long producerId,
+ short producerEpoch,
+ long lastTimestamp,
+ OptionalInt coordinatorEpoch,
+ OptionalLong txnStartOffset
+ ) {
+ PartitionProducerState partitionProducerState = new
PartitionProducerState(singletonList(
+ new ProducerState(
+ producerId,
+ producerEpoch,
+ 500,
+ lastTimestamp,
+ coordinatorEpoch,
+ txnStartOffset
+ )
+ ));
+
+ DescribeProducersResult result =
Mockito.mock(DescribeProducersResult.class);
+ Mockito.when(result.all()).thenReturn(
+ completedFuture(singletonMap(topicPartition,
partitionProducerState))
+ );
+
+ Mockito.when(admin.describeProducers(
+ Collections.singletonList(topicPartition),
+ new DescribeProducersOptions()
+ )).thenReturn(result);
+ }
+
+ private void expectDescribeTransactions(
+ Map<String, TransactionDescription> descriptions
+ ) {
+ DescribeTransactionsResult result =
Mockito.mock(DescribeTransactionsResult.class);
+ descriptions.forEach((transactionalId, description) -> {
+ Mockito.when(result.description(transactionalId))
+ .thenReturn(completedFuture(description));
+ });
+ Mockito.when(result.all()).thenReturn(completedFuture(descriptions));
+
Mockito.when(admin.describeTransactions(descriptions.keySet())).thenReturn(result);
+ }
+
+ private void expectListTopics(
+ Set<String> topics
+ ) {
+ ListTopicsResult result = Mockito.mock(ListTopicsResult.class);
+ Mockito.when(result.names()).thenReturn(completedFuture(topics));
+ Mockito.when(admin.listTopics()).thenReturn(result);
+ }
+
+ private void expectDescribeTopics(
+ Map<String, TopicDescription> descriptions
+ ) {
+ DescribeTopicsResult result = Mockito.mock(DescribeTopicsResult.class);
+ Mockito.when(result.all()).thenReturn(completedFuture(descriptions));
+ Mockito.when(admin.describeTopics(new
ArrayList<>(descriptions.keySet()))).thenReturn(result);
+ }
+
+ @Test
+ public void testFindHangingLookupTopicPartitionsForBroker() throws
Exception {
+ int brokerId = 5;
+
+ String[] args = new String[]{
+ "--bootstrap-server",
+ "localhost:9092",
+ "find-hanging",
+ "--broker-id",
+ String.valueOf(brokerId)
+ };
+
+ String topic = "foo";
+ expectListTopics(singleton(topic));
+
+ Node node0 = new Node(0, "localhost", 9092);
+ Node node1 = new Node(1, "localhost", 9093);
+ Node node5 = new Node(5, "localhost", 9097);
+
+ TopicPartitionInfo partition0 = new TopicPartitionInfo(
+ 0,
+ node0,
+ Arrays.asList(node0, node1),
+ Arrays.asList(node0, node1)
+ );
+ TopicPartitionInfo partition1 = new TopicPartitionInfo(
+ 1,
+ node1,
+ Arrays.asList(node1, node5),
+ Arrays.asList(node1, node5)
+ );
+
+ TopicDescription description = new TopicDescription(
+ topic,
+ false,
+ Arrays.asList(partition0, partition1)
+ );
+ expectDescribeTopics(singletonMap(topic, description));
+
+ DescribeProducersResult result =
Mockito.mock(DescribeProducersResult.class);
+ Mockito.when(result.all()).thenReturn(completedFuture(emptyMap()));
+
+ Mockito.when(admin.describeProducers(
+ Collections.singletonList(new TopicPartition(topic, 1)),
+ new DescribeProducersOptions().brokerId(brokerId)
+ )).thenReturn(result);
+
+ execute(args);
+ assertNormalExit();
+ assertNoHangingTransactions();
+ }
+
+ @Test
+ public void testFindHangingLookupTopicAndBrokerId() throws Exception {
+ int brokerId = 5;
+ String topic = "foo";
+
+ String[] args = new String[]{
+ "--bootstrap-server",
+ "localhost:9092",
+ "find-hanging",
+ "--broker-id",
+ String.valueOf(brokerId),
+ "--topic",
+ topic
+ };
+
+ Node node0 = new Node(0, "localhost", 9092);
+ Node node1 = new Node(1, "localhost", 9093);
+ Node node5 = new Node(5, "localhost", 9097);
+
+ TopicPartitionInfo partition0 = new TopicPartitionInfo(
+ 0,
+ node0,
+ Arrays.asList(node0, node1),
+ Arrays.asList(node0, node1)
+ );
+ TopicPartitionInfo partition1 = new TopicPartitionInfo(
+ 1,
+ node1,
+ Arrays.asList(node1, node5),
+ Arrays.asList(node1, node5)
+ );
+
+ TopicDescription description = new TopicDescription(
+ topic,
+ false,
+ Arrays.asList(partition0, partition1)
+ );
+ expectDescribeTopics(singletonMap(topic, description));
+
+ DescribeProducersResult result =
Mockito.mock(DescribeProducersResult.class);
+ Mockito.when(result.all()).thenReturn(completedFuture(emptyMap()));
+
+ Mockito.when(admin.describeProducers(
+ Collections.singletonList(new TopicPartition(topic, 1)),
+ new DescribeProducersOptions().brokerId(brokerId)
+ )).thenReturn(result);
+
+ execute(args);
+ assertNormalExit();
+ assertNoHangingTransactions();
+ }
+
+ @Test
+ public void testFindHangingLookupTopicPartitionsForTopic() throws
Exception {
+ String topic = "foo";
+
+ String[] args = new String[]{
+ "--bootstrap-server",
+ "localhost:9092",
+ "find-hanging",
+ "--topic",
+ topic
+ };
+
+ Node node0 = new Node(0, "localhost", 9092);
+ Node node1 = new Node(1, "localhost", 9093);
+ Node node5 = new Node(5, "localhost", 9097);
+
+ TopicPartitionInfo partition0 = new TopicPartitionInfo(
+ 0,
+ node0,
+ Arrays.asList(node0, node1),
+ Arrays.asList(node0, node1)
+ );
+ TopicPartitionInfo partition1 = new TopicPartitionInfo(
+ 1,
+ node1,
+ Arrays.asList(node1, node5),
+ Arrays.asList(node1, node5)
+ );
+
+ TopicDescription description = new TopicDescription(
+ topic,
+ false,
+ Arrays.asList(partition0, partition1)
+ );
+ expectDescribeTopics(singletonMap(topic, description));
+
+ DescribeProducersResult result =
Mockito.mock(DescribeProducersResult.class);
+ Mockito.when(result.all()).thenReturn(completedFuture(emptyMap()));
+
+ Mockito.when(admin.describeProducers(
+ Arrays.asList(new TopicPartition(topic, 0), new
TopicPartition(topic, 1)),
+ new DescribeProducersOptions()
+ )).thenReturn(result);
+
+ execute(args);
+ assertNormalExit();
+ assertNoHangingTransactions();
+ }
+
+ private void assertNoHangingTransactions() throws Exception {
+ List<List<String>> table = readOutputAsTable();
+ assertEquals(1, table.size());
+
+ List<String> expectedHeaders =
asList(TransactionsCommand.FindHangingTransactionsCommand.HEADERS);
+ assertEquals(expectedHeaders, table.get(0));
+ }
+
+ @Test
+ public void testFindHangingSpecifiedTopicPartition() throws Exception {
+ TopicPartition topicPartition = new TopicPartition("foo", 5);
+
+ String[] args = new String[]{
+ "--bootstrap-server",
+ "localhost:9092",
+ "find-hanging",
+ "--topic",
+ topicPartition.topic(),
+ "--partition",
+ String.valueOf(topicPartition.partition())
+ };
+
+ long producerId = 132L;
+ short producerEpoch = 5;
+ long lastTimestamp = time.milliseconds();
+ OptionalInt coordinatorEpoch = OptionalInt.of(19);
+ OptionalLong txnStartOffset = OptionalLong.of(29384L);
+
+ expectDescribeProducers(
+ topicPartition,
+ producerId,
+ producerEpoch,
+ lastTimestamp,
+ coordinatorEpoch,
+ txnStartOffset
+ );
+
+ execute(args);
+ assertNormalExit();
+
+ List<List<String>> table = readOutputAsTable();
+ assertEquals(1, table.size());
+
+ List<String> expectedHeaders =
asList(TransactionsCommand.FindHangingTransactionsCommand.HEADERS);
+ assertEquals(expectedHeaders, table.get(0));
+ }
+
+ @Test
+ public void testFindHangingNoMappedTransactionalId() throws Exception {
+ TopicPartition topicPartition = new TopicPartition("foo", 5);
+
+ String[] args = new String[]{
+ "--bootstrap-server",
+ "localhost:9092",
+ "find-hanging",
+ "--topic",
+ topicPartition.topic(),
+ "--partition",
+ String.valueOf(topicPartition.partition())
+ };
+
+ long producerId = 132L;
+ short producerEpoch = 5;
+ long lastTimestamp = time.milliseconds() -
TimeUnit.MINUTES.toMillis(60);
+ int coordinatorEpoch = 19;
+ long txnStartOffset = 29384L;
+
+ expectDescribeProducers(
+ topicPartition,
+ producerId,
+ producerEpoch,
+ lastTimestamp,
+ OptionalInt.of(coordinatorEpoch),
+ OptionalLong.of(txnStartOffset)
+ );
+
+ expectListTransactions(
+ new
ListTransactionsOptions().filterProducerIds(singleton(producerId)),
+ singletonMap(1, Collections.emptyList())
+ );
+
+ expectDescribeTransactions(Collections.emptyMap());
+
+ execute(args);
+ assertNormalExit();
+
+ assertHangingTransaction(
+ topicPartition,
+ producerId,
+ producerEpoch,
+ coordinatorEpoch,
+ txnStartOffset,
+ lastTimestamp
+ );
+ }
+
+ @Test
+ public void testFindHangingWithNoTransactionDescription() throws Exception
{
+ TopicPartition topicPartition = new TopicPartition("foo", 5);
+
+ String[] args = new String[]{
+ "--bootstrap-server",
+ "localhost:9092",
+ "find-hanging",
+ "--topic",
+ topicPartition.topic(),
+ "--partition",
+ String.valueOf(topicPartition.partition())
+ };
+
+ long producerId = 132L;
+ short producerEpoch = 5;
+ long lastTimestamp = time.milliseconds() -
TimeUnit.MINUTES.toMillis(60);
+ int coordinatorEpoch = 19;
+ long txnStartOffset = 29384L;
+
+ expectDescribeProducers(
+ topicPartition,
+ producerId,
+ producerEpoch,
+ lastTimestamp,
+ OptionalInt.of(coordinatorEpoch),
+ OptionalLong.of(txnStartOffset)
+ );
+
+ String transactionalId = "bar";
+ TransactionListing listing = new TransactionListing(
+ transactionalId,
+ producerId,
+ TransactionState.ONGOING
+ );
+
+ expectListTransactions(
+ new
ListTransactionsOptions().filterProducerIds(singleton(producerId)),
+ singletonMap(1, Collections.singletonList(listing))
+ );
+
+ DescribeTransactionsResult result =
Mockito.mock(DescribeTransactionsResult.class);
+ Mockito.when(result.description(transactionalId))
+ .thenReturn(failedFuture(new
TransactionalIdNotFoundException(transactionalId + " not found")));
+
Mockito.when(admin.describeTransactions(singleton(transactionalId))).thenReturn(result);
+
+ execute(args);
+ assertNormalExit();
+
+ assertHangingTransaction(
+ topicPartition,
+ producerId,
+ producerEpoch,
+ coordinatorEpoch,
+ txnStartOffset,
+ lastTimestamp
+ );
+ }
+
+ private <T> KafkaFuture<T> failedFuture(Exception e) {
+ KafkaFutureImpl<T> future = new KafkaFutureImpl<>();
+ future.completeExceptionally(e);
+ return future;
+ }
+
+ @Test
+ public void
testFindHangingDoesNotFilterByTransactionInProgressWithDifferentPartitions()
throws Exception {
+ TopicPartition topicPartition = new TopicPartition("foo", 5);
+
+ String[] args = new String[]{
+ "--bootstrap-server",
+ "localhost:9092",
+ "find-hanging",
+ "--topic",
+ topicPartition.topic(),
+ "--partition",
+ String.valueOf(topicPartition.partition())
+ };
+
+ long producerId = 132L;
+ short producerEpoch = 5;
+ long lastTimestamp = time.milliseconds() -
TimeUnit.MINUTES.toMillis(60);
+ int coordinatorEpoch = 19;
+ long txnStartOffset = 29384L;
+
+ expectDescribeProducers(
+ topicPartition,
+ producerId,
+ producerEpoch,
+ lastTimestamp,
+ OptionalInt.of(coordinatorEpoch),
+ OptionalLong.of(txnStartOffset)
+ );
+
+ String transactionalId = "bar";
+ TransactionListing listing = new TransactionListing(
+ transactionalId,
+ producerId,
+ TransactionState.ONGOING
+ );
+
+ expectListTransactions(
+ new
ListTransactionsOptions().filterProducerIds(singleton(producerId)),
+ singletonMap(1, Collections.singletonList(listing))
+ );
+
+ // Although there is a transaction in progress from the same
+ // producer epoch, it does not include the topic partition we
+ // found when describing producers.
+ TransactionDescription description = new TransactionDescription(
+ 1,
+ TransactionState.ONGOING,
+ producerId,
+ producerEpoch,
+ 60000,
+ OptionalLong.of(time.milliseconds()),
+ singleton(new TopicPartition("foo", 10))
+ );
+
+ expectDescribeTransactions(singletonMap(transactionalId, description));
+
+ execute(args);
+ assertNormalExit();
+
+ assertHangingTransaction(
+ topicPartition,
+ producerId,
+ producerEpoch,
+ coordinatorEpoch,
+ txnStartOffset,
+ lastTimestamp
+ );
+ }
+
+ private void assertHangingTransaction(
+ TopicPartition topicPartition,
+ long producerId,
+ short producerEpoch,
+ int coordinatorEpoch,
+ long txnStartOffset,
+ long lastTimestamp
+ ) throws Exception {
+ List<List<String>> table = readOutputAsTable();
+ assertEquals(2, table.size());
+
+ List<String> expectedHeaders =
asList(TransactionsCommand.FindHangingTransactionsCommand.HEADERS);
+ assertEquals(expectedHeaders, table.get(0));
+
+ long durationMinutes =
TimeUnit.MILLISECONDS.toMinutes(time.milliseconds() - lastTimestamp);
+
+ List<String> expectedRow = asList(
+ topicPartition.topic(),
+ String.valueOf(topicPartition.partition()),
+ String.valueOf(producerId),
+ String.valueOf(producerEpoch),
+ String.valueOf(coordinatorEpoch),
+ String.valueOf(txnStartOffset),
+ String.valueOf(lastTimestamp),
+ String.valueOf(durationMinutes)
+ );
+ assertEquals(expectedRow, table.get(1));
+ }
+
+ @Test
+ public void
testFindHangingFilterByTransactionInProgressWithSamePartition() throws
Exception {
+ TopicPartition topicPartition = new TopicPartition("foo", 5);
+
+ String[] args = new String[]{
+ "--bootstrap-server",
+ "localhost:9092",
+ "find-hanging",
+ "--topic",
+ topicPartition.topic(),
+ "--partition",
+ String.valueOf(topicPartition.partition())
+ };
+
+ long producerId = 132L;
+ short producerEpoch = 5;
+ long lastTimestamp = time.milliseconds() -
TimeUnit.MINUTES.toMillis(60);
+ int coordinatorEpoch = 19;
+ long txnStartOffset = 29384L;
+
+ expectDescribeProducers(
+ topicPartition,
+ producerId,
+ producerEpoch,
+ lastTimestamp,
+ OptionalInt.of(coordinatorEpoch),
+ OptionalLong.of(txnStartOffset)
+ );
+
+ String transactionalId = "bar";
+ TransactionListing listing = new TransactionListing(
+ transactionalId,
+ producerId,
+ TransactionState.ONGOING
+ );
+
+ expectListTransactions(
+ new
ListTransactionsOptions().filterProducerIds(singleton(producerId)),
+ singletonMap(1, Collections.singletonList(listing))
+ );
+
+ // The coordinator shows an active transaction with the same epoch
+ // which includes the partition, so no hanging transaction should
+ // be detected.
+ TransactionDescription description = new TransactionDescription(
+ 1,
+ TransactionState.ONGOING,
+ producerId,
+ producerEpoch,
+ 60000,
+ OptionalLong.of(lastTimestamp),
+ singleton(topicPartition)
+ );
+
+ expectDescribeTransactions(singletonMap(transactionalId, description));
+
+ execute(args);
+ assertNormalExit();
+ assertNoHangingTransactions();
+ }
+
private void execute(String[] args) throws Exception {
TransactionsCommand.execute(args, ns -> admin, out, time);
}