This is an automated email from the ASF dual-hosted git repository.

jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b4e96913cc6 KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to 
ListTransactionsRequest (#15384)
b4e96913cc6 is described below

commit b4e96913cc6c827968e47a31261e0bd8fdf677b5
Author: Yang Yu <[email protected]>
AuthorDate: Sat Feb 24 08:09:23 2024 -0600

    KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest 
(#15384)
    
    Introduces a new filter in ListTransactionsRequest API. This enables caller 
to filter on transactions that have been running for longer than a certain 
duration of time.
    
    This PR includes the following changes:
    
    bumps version for ListTransactionsRequest API to 1. Set the durationFilter 
to -1L when communicating with an older broker that does not support version 1.
    bumps version for ListTransactionsResponse to 1 without changing the 
response structure.
    adds durationFilter option to kafka-transactions.sh --list
    Tests:
    
    Client side test to build request with correct combination of duration 
filter and API version: testBuildRequestWithDurationFilter
    Server side test to filter transactions based on duration: 
testListTransactionsFiltering
    Added test case for kafka-transactions.sh change in TransactionsCommandTest
    
    Reviewers: Justine Olshan <[email protected]>, Raman Verma 
<[email protected]>
---
 .../java/org/apache/kafka/clients/admin/Admin.java |  3 ++-
 .../clients/admin/ListTransactionsOptions.java     | 29 ++++++++++++++++++++--
 .../admin/internals/ListTransactionsHandler.java   |  1 +
 .../common/requests/ListTransactionsRequest.java   |  5 ++++
 .../common/message/ListTransactionsRequest.json    |  6 ++++-
 .../common/message/ListTransactionsResponse.json   |  3 ++-
 .../internals/ListTransactionsHandlerTest.java     | 26 +++++++++++++++++++
 .../transaction/TransactionCoordinator.scala       |  5 ++--
 .../transaction/TransactionStateManager.scala      |  6 ++++-
 core/src/main/scala/kafka/server/KafkaApis.scala   |  3 ++-
 .../transaction/TransactionStateManagerTest.scala  | 19 ++++++++++----
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  4 +--
 .../apache/kafka/tools/TransactionsCommand.java    | 14 +++++++++--
 .../kafka/tools/TransactionsCommandTest.java       | 21 +++++++++++++---
 14 files changed, 124 insertions(+), 21 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index ff7f4e661d6..ff0e60e766d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -1633,7 +1633,8 @@ public interface Admin extends AutoCloseable {
      * coordinators in the cluster and collect the state of all transactions. 
Users
      * should typically attempt to reduce the size of the result set using
      * {@link ListTransactionsOptions#filterProducerIds(Collection)} or
-     * {@link ListTransactionsOptions#filterStates(Collection)}
+     * {@link ListTransactionsOptions#filterStates(Collection)} or
+     * {@link ListTransactionsOptions#durationFilter(Long)}
      *
      * @param options Options to control the method behavior (including 
filters)
      * @return The result
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java
index c23d4441dfd..49cf484f5d6 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java
@@ -35,6 +35,7 @@ public class ListTransactionsOptions extends 
AbstractOptions<ListTransactionsOpt
     private Set<TransactionState> filteredStates = Collections.emptySet();
     private Set<Long> filteredProducerIds = Collections.emptySet();
 
+    private long filteredDuration = -1L;
     /**
      * Filter only the transactions that are in a specific set of states. If 
no filter
      * is specified or if the passed set of states is empty, then transactions 
in all
@@ -61,6 +62,19 @@ public class ListTransactionsOptions extends 
AbstractOptions<ListTransactionsOpt
         return this;
     }
 
+    /**
+     * Filter only the transactions that are running longer than the specified 
duration.
+     * If no filter is specified or if the passed duration ms is less than 0,
+     * then the all transactions will be returned.
+     *
+     * @param durationMs the duration in milliseconds to filter by
+     * @return this object
+     */
+    public ListTransactionsOptions filterOnDuration(long durationMs) {
+        this.filteredDuration = durationMs;
+        return this;
+    }
+
     /**
      * Returns the set of states to be filtered or empty if no states have 
been specified.
      *
@@ -81,11 +95,21 @@ public class ListTransactionsOptions extends 
AbstractOptions<ListTransactionsOpt
         return filteredProducerIds;
     }
 
+    /**
+     * Returns the duration ms value being filtered.
+     *
+     * @return the current duration filter value in ms (negative value means 
transactions are not filtered by duration)
+     */
+    public long filteredDuration() {
+        return filteredDuration;
+    }
+
     @Override
     public String toString() {
         return "ListTransactionsOptions(" +
             "filteredStates=" + filteredStates +
             ", filteredProducerIds=" + filteredProducerIds +
+            ", filteredDuration=" + filteredDuration +
             ", timeoutMs=" + timeoutMs +
             ')';
     }
@@ -96,11 +120,12 @@ public class ListTransactionsOptions extends 
AbstractOptions<ListTransactionsOpt
         if (o == null || getClass() != o.getClass()) return false;
         ListTransactionsOptions that = (ListTransactionsOptions) o;
         return Objects.equals(filteredStates, that.filteredStates) &&
-            Objects.equals(filteredProducerIds, that.filteredProducerIds);
+            Objects.equals(filteredProducerIds, that.filteredProducerIds) &&
+            Objects.equals(filteredDuration, that.filteredDuration);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(filteredStates, filteredProducerIds);
+        return Objects.hash(filteredStates, filteredProducerIds, 
filteredDuration);
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListTransactionsHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListTransactionsHandler.java
index ca249bca7f4..b77cf762e1b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListTransactionsHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListTransactionsHandler.java
@@ -73,6 +73,7 @@ public class ListTransactionsHandler extends 
AdminApiHandler.Batched<AllBrokersS
         request.setStateFilters(options.filteredStates().stream()
             .map(TransactionState::toString)
             .collect(Collectors.toList()));
+        request.setDurationFilter(options.filteredDuration());
         return new ListTransactionsRequest.Builder(request);
     }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ListTransactionsRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ListTransactionsRequest.java
index 0651f1fe6e5..a5fef3ee7b2 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ListTransactionsRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ListTransactionsRequest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.ListTransactionsRequestData;
 import org.apache.kafka.common.message.ListTransactionsResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
@@ -35,6 +36,10 @@ public class ListTransactionsRequest extends AbstractRequest 
{
 
         @Override
         public ListTransactionsRequest build(short version) {
+            if (data.durationFilter() >= 0 && version < 1) {
+                throw new UnsupportedVersionException("Duration filter can be 
set only when using API version 1 or higher." +
+                        " If client is connected to an older broker, do not 
specify duration filter or set duration filter to -1.");
+            }
             return new ListTransactionsRequest(data, version);
         }
 
diff --git 
a/clients/src/main/resources/common/message/ListTransactionsRequest.json 
b/clients/src/main/resources/common/message/ListTransactionsRequest.json
index 21f4552cd03..2aeeaa62e28 100644
--- a/clients/src/main/resources/common/message/ListTransactionsRequest.json
+++ b/clients/src/main/resources/common/message/ListTransactionsRequest.json
@@ -18,7 +18,8 @@
   "type": "request",
   "listeners": ["zkBroker", "broker"],
   "name": "ListTransactionsRequest",
-  "validVersions": "0",
+  // Version 1: adds DurationFilter to list transactions older than specified 
duration
+  "validVersions": "0-1",
   "flexibleVersions": "0+",
   "fields": [
     { "name": "StateFilters", "type": "[]string", "versions": "0+",
@@ -26,6 +27,9 @@
     },
     { "name": "ProducerIdFilters", "type": "[]int64", "versions": "0+", 
"entityType": "producerId",
       "about": "The producerIds to filter by: if empty, all transactions will 
be returned; if non-empty, only transactions which match one of the filtered 
producerIds will be returned"
+    },
+    { "name": "DurationFilter", "type": "int64", "versions": "1+", "default": 
-1,
+      "about": "Duration (in millis) to filter by: if < 0, all transactions 
will be returned; otherwise, only transactions running longer than this 
duration will be returned"
     }
   ]
 }
diff --git 
a/clients/src/main/resources/common/message/ListTransactionsResponse.json 
b/clients/src/main/resources/common/message/ListTransactionsResponse.json
index 2f178732391..e9924801cc0 100644
--- a/clients/src/main/resources/common/message/ListTransactionsResponse.json
+++ b/clients/src/main/resources/common/message/ListTransactionsResponse.json
@@ -17,7 +17,8 @@
   "apiKey": 66,
   "type": "response",
   "name": "ListTransactionsResponse",
-  "validVersions": "0",
+  // Version 1 is the same as version 0 (KIP-994).
+  "validVersions": "0-1",
   "flexibleVersions": "0+",
   "fields": [
       { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListTransactionsHandlerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListTransactionsHandlerTest.java
index 3c54fad65e8..89582529212 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListTransactionsHandlerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListTransactionsHandlerTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.admin.TransactionState;
 import org.apache.kafka.clients.admin.internals.AdminApiHandler.ApiResult;
 import org.apache.kafka.clients.admin.internals.AllBrokersStrategy.BrokerKey;
 import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.ListTransactionsResponseData;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ListTransactionsRequest;
@@ -41,6 +42,7 @@ import static java.util.Arrays.asList;
 import static java.util.Collections.singleton;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public class ListTransactionsHandlerTest {
     private final LogContext logContext = new LogContext();
@@ -83,6 +85,30 @@ public class ListTransactionsHandlerTest {
         assertEquals(Collections.emptyList(), 
request.data().producerIdFilters());
     }
 
+    @Test
+    public void testBuildRequestWithDurationFilter() {
+        int brokerId = 1;
+        BrokerKey brokerKey = new BrokerKey(OptionalInt.of(brokerId));
+        ListTransactionsOptions options = new ListTransactionsOptions();
+        ListTransactionsHandler handler = new ListTransactionsHandler(options, 
logContext);
+        // case 1: check the default value (-1L) for durationFilter
+        ListTransactionsRequest request = 
handler.buildBatchedRequest(brokerId, singleton(brokerKey)).build((short) 1);
+        assertEquals(-1L, request.data().durationFilter());
+        request = handler.buildBatchedRequest(brokerId, 
singleton(brokerKey)).build((short) 0);
+        assertEquals(-1L, request.data().durationFilter());
+        // case 2: able to set a valid duration filter when using API version 1
+        options.filterOnDuration(10L);
+        request = handler.buildBatchedRequest(brokerId, 
singleton(brokerKey)).build((short) 1);
+        assertEquals(10L, request.data().durationFilter());
+        assertEquals(Collections.emptyList(), 
request.data().producerIdFilters());
+        // case 3: unable to set a valid duration filter when using API 
version 0
+        assertThrows(UnsupportedVersionException.class, () -> 
handler.buildBatchedRequest(brokerId, singleton(brokerKey)).build((short) 0));
+        // case 4: able to set duration filter to -1L when using API version 0
+        options.filterOnDuration(-1L);
+        ListTransactionsRequest request1 = 
handler.buildBatchedRequest(brokerId, singleton(brokerKey)).build((short) 0);
+        assertEquals(-1L, request1.data().durationFilter());
+    }
+
     @Test
     public void testHandleSuccessfulResponse() {
         int brokerId = 1;
diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index 74366317b40..c7f5a16b066 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -278,12 +278,13 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
 
   def handleListTransactions(
     filteredProducerIds: Set[Long],
-    filteredStates: Set[String]
+    filteredStates: Set[String],
+    filteredDuration: Long = -1L
   ): ListTransactionsResponseData = {
     if (!isActive.get()) {
       new 
ListTransactionsResponseData().setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code)
     } else {
-      txnManager.listTransactionStates(filteredProducerIds, filteredStates)
+      txnManager.listTransactionStates(filteredProducerIds, filteredStates, 
filteredDuration)
     }
   }
 
diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index c4e71ca7669..355e8489807 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -301,7 +301,8 @@ class TransactionStateManager(brokerId: Int,
 
   def listTransactionStates(
     filterProducerIds: Set[Long],
-    filterStateNames: Set[String]
+    filterStateNames: Set[String],
+    filterDurationMs: Long
   ): ListTransactionsResponseData = {
     inReadLock(stateLock) {
       val response = new ListTransactionsResponseData()
@@ -316,6 +317,7 @@ class TransactionStateManager(brokerId: Int,
           }
         }
 
+        val now : Long = time.milliseconds()
         def shouldInclude(txnMetadata: TransactionMetadata): Boolean = {
           if (txnMetadata.state == Dead) {
             // We filter the `Dead` state since it is a transient state which
@@ -326,6 +328,8 @@ class TransactionStateManager(brokerId: Int,
             false
           } else if (filterStateNames.nonEmpty && 
!filterStates.contains(txnMetadata.state)) {
             false
+          } else if (filterDurationMs >= 0 && (now - 
txnMetadata.txnStartTimestamp) <= filterDurationMs) {
+            false
           } else {
             true
           }
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index c1367884272..e376b053e44 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -3747,7 +3747,8 @@ class KafkaApis(val requestChannel: RequestChannel,
     val listTransactionsRequest = request.body[ListTransactionsRequest]
     val filteredProducerIds = 
listTransactionsRequest.data.producerIdFilters.asScala.map(Long.unbox).toSet
     val filteredStates = 
listTransactionsRequest.data.stateFilters.asScala.toSet
-    val response = txnCoordinator.handleListTransactions(filteredProducerIds, 
filteredStates)
+    val durationFilter = listTransactionsRequest.data.durationFilter()
+    val response = txnCoordinator.handleListTransactions(filteredProducerIds, 
filteredStates, durationFilter)
 
     // The response should contain only transactionalIds that the principal
     // has `Describe` permission to access.
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 94ffe7a9795..907e4c9c17e 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -489,7 +489,8 @@ class TransactionStateManagerTest {
     transactionManager.addLoadingPartition(partitionId = 0, coordinatorEpoch = 
15)
     val listResponse = transactionManager.listTransactionStates(
       filterProducerIds = Set.empty,
-      filterStateNames = Set.empty
+      filterStateNames = Set.empty,
+      -1L
     )
     assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, 
Errors.forCode(listResponse.errorCode))
   }
@@ -513,12 +514,16 @@ class TransactionStateManagerTest {
 
     putTransaction(transactionalId = "t0", producerId = 0, state = Ongoing)
     putTransaction(transactionalId = "t1", producerId = 1, state = Ongoing)
+    // update time to create transactions with various durations
+    time.sleep(1000)
     putTransaction(transactionalId = "t2", producerId = 2, state = 
PrepareCommit)
     putTransaction(transactionalId = "t3", producerId = 3, state = 
PrepareAbort)
+    time.sleep(1000)
     putTransaction(transactionalId = "t4", producerId = 4, state = 
CompleteCommit)
     putTransaction(transactionalId = "t5", producerId = 5, state = 
CompleteAbort)
     putTransaction(transactionalId = "t6", producerId = 6, state = 
CompleteAbort)
     putTransaction(transactionalId = "t7", producerId = 7, state = 
PrepareEpochFence)
+    time.sleep(1000)
     // Note that `Dead` transactions are never returned. This is a transient 
state
     // which is used when the transaction state is in the process of being 
deleted
     // (whether though expiration or coordinator unloading).
@@ -527,16 +532,20 @@ class TransactionStateManagerTest {
     def assertListTransactions(
       expectedTransactionalIds: Set[String],
       filterProducerIds: Set[Long] = Set.empty,
-      filterStates: Set[String] = Set.empty
+      filterStates: Set[String] = Set.empty,
+      filterDuration: Long = -1L
     ): Unit = {
-      val listResponse = 
transactionManager.listTransactionStates(filterProducerIds, filterStates)
+      val listResponse = 
transactionManager.listTransactionStates(filterProducerIds, filterStates, 
filterDuration)
       assertEquals(Errors.NONE, Errors.forCode(listResponse.errorCode))
       assertEquals(expectedTransactionalIds, 
listResponse.transactionStates.asScala.map(_.transactionalId).toSet)
       val expectedUnknownStates = filterStates.filter(state => 
TransactionState.fromName(state).isEmpty)
       assertEquals(expectedUnknownStates, 
listResponse.unknownStateFilters.asScala.toSet)
     }
-
     assertListTransactions(Set("t0", "t1", "t2", "t3", "t4", "t5", "t6", "t7"))
+    assertListTransactions(Set("t0", "t1", "t2", "t3", "t4", "t5", "t6", 
"t7"), filterDuration = 0L)
+    assertListTransactions(Set("t0", "t1", "t2", "t3"), filterDuration = 1000L)
+    assertListTransactions(Set("t0", "t1"), filterDuration = 2000L)
+    assertListTransactions(Set(), filterDuration = 3000L)
     assertListTransactions(Set("t0", "t1"), filterStates = Set("Ongoing"))
     assertListTransactions(Set("t0", "t1"), filterStates = Set("Ongoing", 
"UnknownState"))
     assertListTransactions(Set("t2", "t4"), filterStates = 
Set("PrepareCommit", "CompleteCommit"))
@@ -843,7 +852,7 @@ class TransactionStateManagerTest {
   }
 
   private def listExpirableTransactionalIds(): Set[String] = {
-    val activeTransactionalIds = 
transactionManager.listTransactionStates(Set.empty, Set.empty)
+    val activeTransactionalIds = 
transactionManager.listTransactionStates(Set.empty, Set.empty, -1L)
       .transactionStates
       .asScala
       .map(_.transactionalId)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index f272bd5a23b..90ff37ce329 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -6499,7 +6499,7 @@ class KafkaApisTest extends Logging {
     
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
       any[Long])).thenReturn(0)
 
-    when(txnCoordinator.handleListTransactions(Set.empty[Long], 
Set.empty[String]))
+    when(txnCoordinator.handleListTransactions(Set.empty[Long], 
Set.empty[String], -1L))
       .thenReturn(new ListTransactionsResponseData()
         .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code))
     kafkaApis = createKafkaApis()
@@ -6529,7 +6529,7 @@ class KafkaApisTest extends Logging {
       .setProducerId(98765)
       .setTransactionState("PrepareAbort"))
 
-    when(txnCoordinator.handleListTransactions(Set.empty[Long], 
Set.empty[String]))
+    when(txnCoordinator.handleListTransactions(Set.empty[Long], 
Set.empty[String], -1L))
       .thenReturn(new ListTransactionsResponseData()
         .setErrorCode(Errors.NONE.code)
         .setTransactionStates(transactionStates))
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java 
b/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
index 243f28035f4..479db63e18a 100644
--- a/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
@@ -436,16 +436,26 @@ public abstract class TransactionsCommand {
 
         @Override
         public void addSubparser(Subparsers subparsers) {
-            subparsers.addParser(name())
+            Subparser subparser = subparsers.addParser(name())
                 .help("list transactions");
+
+            subparser.addArgument("--duration-filter")
+                    .help("Duration (in millis) to filter by: if < 0, all 
transactions will be returned; " +
+                            "otherwise, only transactions running longer than 
this duration will be returned")
+                    .action(store())
+                    .type(Long.class)
+                    .required(false);
         }
 
         @Override
         public void execute(Admin admin, Namespace ns, PrintStream out) throws 
Exception {
+            ListTransactionsOptions options = new ListTransactionsOptions();
+            
Optional.ofNullable(ns.getLong("duration_filter")).ifPresent(options::filterOnDuration);
+
             final Map<Integer, Collection<TransactionListing>> result;
 
             try {
-                result = admin.listTransactions(new ListTransactionsOptions())
+                result = admin.listTransactions(options)
                     .allByBrokerId()
                     .get();
             } catch (ExecutionException e) {
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java 
b/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java
index ef7b5b2ab99..fde6434cf02 100644
--- a/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java
@@ -187,14 +187,25 @@ public class TransactionsCommandTest {
         assertEquals(expectedRows, new HashSet<>(table.subList(1, 
table.size())));
     }
 
-    @Test
-    public void testListTransactions() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testListTransactions(boolean hasDurationFilter) throws 
Exception {
         String[] args = new String[] {
             "--bootstrap-server",
             "localhost:9092",
             "list"
         };
 
+        if (hasDurationFilter) {
+            args = new String[] {
+                "--bootstrap-server",
+                "localhost:9092",
+                "list",
+                "--duration-filter",
+                Long.toString(Long.MAX_VALUE)
+            };
+        }
+
         Map<Integer, Collection<TransactionListing>> transactions = new 
HashMap<>();
         transactions.put(0, asList(
             new TransactionListing("foo", 12345L, TransactionState.ONGOING),
@@ -204,7 +215,11 @@ public class TransactionsCommandTest {
             new TransactionListing("baz", 13579L, 
TransactionState.COMPLETE_COMMIT)
         ));
 
-        expectListTransactions(transactions);
+        if (hasDurationFilter) {
+            expectListTransactions(new 
ListTransactionsOptions().filterOnDuration(Long.MAX_VALUE), transactions);
+        } else {
+            expectListTransactions(transactions);
+        }
 
         execute(args);
         assertNormalExit();

Reply via email to