This is an automated email from the ASF dual-hosted git repository.
jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b4e96913cc6 KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to
ListTransactionsRequest (#15384)
b4e96913cc6 is described below
commit b4e96913cc6c827968e47a31261e0bd8fdf677b5
Author: Yang Yu <[email protected]>
AuthorDate: Sat Feb 24 08:09:23 2024 -0600
KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest
(#15384)
Introduces a new filter in ListTransactionsRequest API. This enables caller
to filter on transactions that have been running for longer than a certain
duration of time.
This PR includes the following changes:
bumps version for ListTransactionsRequest API to 1. Set the durationFilter
to -1L when communicating with an older broker that does not support version 1.
bumps version for ListTransactionsResponse to 1 without changing the
response structure.
adds durationFilter option to kafka-transactions.sh --list
Tests:
Client side test to build request with correct combination of duration
filter and API version: testBuildRequestWithDurationFilter
Server side test to filter transactions based on duration:
testListTransactionsFiltering
Added test case for kafka-transactions.sh change in TransactionsCommandTest
Reviewers: Justine Olshan <[email protected]>, Raman Verma
<[email protected]>
---
.../java/org/apache/kafka/clients/admin/Admin.java | 3 ++-
.../clients/admin/ListTransactionsOptions.java | 29 ++++++++++++++++++++--
.../admin/internals/ListTransactionsHandler.java | 1 +
.../common/requests/ListTransactionsRequest.java | 5 ++++
.../common/message/ListTransactionsRequest.json | 6 ++++-
.../common/message/ListTransactionsResponse.json | 3 ++-
.../internals/ListTransactionsHandlerTest.java | 26 +++++++++++++++++++
.../transaction/TransactionCoordinator.scala | 5 ++--
.../transaction/TransactionStateManager.scala | 6 ++++-
core/src/main/scala/kafka/server/KafkaApis.scala | 3 ++-
.../transaction/TransactionStateManagerTest.scala | 19 ++++++++++----
.../scala/unit/kafka/server/KafkaApisTest.scala | 4 +--
.../apache/kafka/tools/TransactionsCommand.java | 14 +++++++++--
.../kafka/tools/TransactionsCommandTest.java | 21 +++++++++++++---
14 files changed, 124 insertions(+), 21 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index ff7f4e661d6..ff0e60e766d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -1633,7 +1633,8 @@ public interface Admin extends AutoCloseable {
* coordinators in the cluster and collect the state of all transactions.
Users
* should typically attempt to reduce the size of the result set using
* {@link ListTransactionsOptions#filterProducerIds(Collection)} or
- * {@link ListTransactionsOptions#filterStates(Collection)}
+ * {@link ListTransactionsOptions#filterStates(Collection)} or
+ * {@link ListTransactionsOptions#durationFilter(Long)}
*
* @param options Options to control the method behavior (including
filters)
* @return The result
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java
b/clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java
index c23d4441dfd..49cf484f5d6 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java
@@ -35,6 +35,7 @@ public class ListTransactionsOptions extends
AbstractOptions<ListTransactionsOpt
private Set<TransactionState> filteredStates = Collections.emptySet();
private Set<Long> filteredProducerIds = Collections.emptySet();
+ private long filteredDuration = -1L;
/**
* Filter only the transactions that are in a specific set of states. If
no filter
* is specified or if the passed set of states is empty, then transactions
in all
@@ -61,6 +62,19 @@ public class ListTransactionsOptions extends
AbstractOptions<ListTransactionsOpt
return this;
}
+ /**
+ * Filter only the transactions that are running longer than the specified
duration.
+ * If no filter is specified or if the passed duration ms is less than 0,
+ * then the all transactions will be returned.
+ *
+ * @param durationMs the duration in milliseconds to filter by
+ * @return this object
+ */
+ public ListTransactionsOptions filterOnDuration(long durationMs) {
+ this.filteredDuration = durationMs;
+ return this;
+ }
+
/**
* Returns the set of states to be filtered or empty if no states have
been specified.
*
@@ -81,11 +95,21 @@ public class ListTransactionsOptions extends
AbstractOptions<ListTransactionsOpt
return filteredProducerIds;
}
+ /**
+ * Returns the duration ms value being filtered.
+ *
+ * @return the current duration filter value in ms (negative value means
transactions are not filtered by duration)
+ */
+ public long filteredDuration() {
+ return filteredDuration;
+ }
+
@Override
public String toString() {
return "ListTransactionsOptions(" +
"filteredStates=" + filteredStates +
", filteredProducerIds=" + filteredProducerIds +
+ ", filteredDuration=" + filteredDuration +
", timeoutMs=" + timeoutMs +
')';
}
@@ -96,11 +120,12 @@ public class ListTransactionsOptions extends
AbstractOptions<ListTransactionsOpt
if (o == null || getClass() != o.getClass()) return false;
ListTransactionsOptions that = (ListTransactionsOptions) o;
return Objects.equals(filteredStates, that.filteredStates) &&
- Objects.equals(filteredProducerIds, that.filteredProducerIds);
+ Objects.equals(filteredProducerIds, that.filteredProducerIds) &&
+ Objects.equals(filteredDuration, that.filteredDuration);
}
@Override
public int hashCode() {
- return Objects.hash(filteredStates, filteredProducerIds);
+ return Objects.hash(filteredStates, filteredProducerIds,
filteredDuration);
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListTransactionsHandler.java
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListTransactionsHandler.java
index ca249bca7f4..b77cf762e1b 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListTransactionsHandler.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListTransactionsHandler.java
@@ -73,6 +73,7 @@ public class ListTransactionsHandler extends
AdminApiHandler.Batched<AllBrokersS
request.setStateFilters(options.filteredStates().stream()
.map(TransactionState::toString)
.collect(Collectors.toList()));
+ request.setDurationFilter(options.filteredDuration());
return new ListTransactionsRequest.Builder(request);
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ListTransactionsRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/ListTransactionsRequest.java
index 0651f1fe6e5..a5fef3ee7b2 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/ListTransactionsRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/ListTransactionsRequest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.requests;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.ListTransactionsRequestData;
import org.apache.kafka.common.message.ListTransactionsResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
@@ -35,6 +36,10 @@ public class ListTransactionsRequest extends AbstractRequest
{
@Override
public ListTransactionsRequest build(short version) {
+ if (data.durationFilter() >= 0 && version < 1) {
+ throw new UnsupportedVersionException("Duration filter can be
set only when using API version 1 or higher." +
+ " If client is connected to an older broker, do not
specify duration filter or set duration filter to -1.");
+ }
return new ListTransactionsRequest(data, version);
}
diff --git
a/clients/src/main/resources/common/message/ListTransactionsRequest.json
b/clients/src/main/resources/common/message/ListTransactionsRequest.json
index 21f4552cd03..2aeeaa62e28 100644
--- a/clients/src/main/resources/common/message/ListTransactionsRequest.json
+++ b/clients/src/main/resources/common/message/ListTransactionsRequest.json
@@ -18,7 +18,8 @@
"type": "request",
"listeners": ["zkBroker", "broker"],
"name": "ListTransactionsRequest",
- "validVersions": "0",
+ // Version 1: adds DurationFilter to list transactions older than specified
duration
+ "validVersions": "0-1",
"flexibleVersions": "0+",
"fields": [
{ "name": "StateFilters", "type": "[]string", "versions": "0+",
@@ -26,6 +27,9 @@
},
{ "name": "ProducerIdFilters", "type": "[]int64", "versions": "0+",
"entityType": "producerId",
"about": "The producerIds to filter by: if empty, all transactions will
be returned; if non-empty, only transactions which match one of the filtered
producerIds will be returned"
+ },
+ { "name": "DurationFilter", "type": "int64", "versions": "1+", "default":
-1,
+ "about": "Duration (in millis) to filter by: if < 0, all transactions
will be returned; otherwise, only transactions running longer than this
duration will be returned"
}
]
}
diff --git
a/clients/src/main/resources/common/message/ListTransactionsResponse.json
b/clients/src/main/resources/common/message/ListTransactionsResponse.json
index 2f178732391..e9924801cc0 100644
--- a/clients/src/main/resources/common/message/ListTransactionsResponse.json
+++ b/clients/src/main/resources/common/message/ListTransactionsResponse.json
@@ -17,7 +17,8 @@
"apiKey": 66,
"type": "response",
"name": "ListTransactionsResponse",
- "validVersions": "0",
+ // Version 1 is the same as version 0 (KIP-994).
+ "validVersions": "0-1",
"flexibleVersions": "0+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListTransactionsHandlerTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListTransactionsHandlerTest.java
index 3c54fad65e8..89582529212 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListTransactionsHandlerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListTransactionsHandlerTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.admin.TransactionState;
import org.apache.kafka.clients.admin.internals.AdminApiHandler.ApiResult;
import org.apache.kafka.clients.admin.internals.AllBrokersStrategy.BrokerKey;
import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.ListTransactionsResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ListTransactionsRequest;
@@ -41,6 +42,7 @@ import static java.util.Arrays.asList;
import static java.util.Collections.singleton;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
public class ListTransactionsHandlerTest {
private final LogContext logContext = new LogContext();
@@ -83,6 +85,30 @@ public class ListTransactionsHandlerTest {
assertEquals(Collections.emptyList(),
request.data().producerIdFilters());
}
+ @Test
+ public void testBuildRequestWithDurationFilter() {
+ int brokerId = 1;
+ BrokerKey brokerKey = new BrokerKey(OptionalInt.of(brokerId));
+ ListTransactionsOptions options = new ListTransactionsOptions();
+ ListTransactionsHandler handler = new ListTransactionsHandler(options,
logContext);
+ // case 1: check the default value (-1L) for durationFilter
+ ListTransactionsRequest request =
handler.buildBatchedRequest(brokerId, singleton(brokerKey)).build((short) 1);
+ assertEquals(-1L, request.data().durationFilter());
+ request = handler.buildBatchedRequest(brokerId,
singleton(brokerKey)).build((short) 0);
+ assertEquals(-1L, request.data().durationFilter());
+ // case 2: able to set a valid duration filter when using API version 1
+ options.filterOnDuration(10L);
+ request = handler.buildBatchedRequest(brokerId,
singleton(brokerKey)).build((short) 1);
+ assertEquals(10L, request.data().durationFilter());
+ assertEquals(Collections.emptyList(),
request.data().producerIdFilters());
+ // case 3: unable to set a valid duration filter when using API
version 0
+ assertThrows(UnsupportedVersionException.class, () ->
handler.buildBatchedRequest(brokerId, singleton(brokerKey)).build((short) 0));
+ // case 4: able to set duration filter to -1L when using API version 0
+ options.filterOnDuration(-1L);
+ ListTransactionsRequest request1 =
handler.buildBatchedRequest(brokerId, singleton(brokerKey)).build((short) 0);
+ assertEquals(-1L, request1.data().durationFilter());
+ }
+
@Test
public void testHandleSuccessfulResponse() {
int brokerId = 1;
diff --git
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index 74366317b40..c7f5a16b066 100644
---
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -278,12 +278,13 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
def handleListTransactions(
filteredProducerIds: Set[Long],
- filteredStates: Set[String]
+ filteredStates: Set[String],
+ filteredDuration: Long = -1L
): ListTransactionsResponseData = {
if (!isActive.get()) {
new
ListTransactionsResponseData().setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code)
} else {
- txnManager.listTransactionStates(filteredProducerIds, filteredStates)
+ txnManager.listTransactionStates(filteredProducerIds, filteredStates,
filteredDuration)
}
}
diff --git
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index c4e71ca7669..355e8489807 100644
---
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -301,7 +301,8 @@ class TransactionStateManager(brokerId: Int,
def listTransactionStates(
filterProducerIds: Set[Long],
- filterStateNames: Set[String]
+ filterStateNames: Set[String],
+ filterDurationMs: Long
): ListTransactionsResponseData = {
inReadLock(stateLock) {
val response = new ListTransactionsResponseData()
@@ -316,6 +317,7 @@ class TransactionStateManager(brokerId: Int,
}
}
+ val now : Long = time.milliseconds()
def shouldInclude(txnMetadata: TransactionMetadata): Boolean = {
if (txnMetadata.state == Dead) {
// We filter the `Dead` state since it is a transient state which
@@ -326,6 +328,8 @@ class TransactionStateManager(brokerId: Int,
false
} else if (filterStateNames.nonEmpty &&
!filterStates.contains(txnMetadata.state)) {
false
+ } else if (filterDurationMs >= 0 && (now -
txnMetadata.txnStartTimestamp) <= filterDurationMs) {
+ false
} else {
true
}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index c1367884272..e376b053e44 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -3747,7 +3747,8 @@ class KafkaApis(val requestChannel: RequestChannel,
val listTransactionsRequest = request.body[ListTransactionsRequest]
val filteredProducerIds =
listTransactionsRequest.data.producerIdFilters.asScala.map(Long.unbox).toSet
val filteredStates =
listTransactionsRequest.data.stateFilters.asScala.toSet
- val response = txnCoordinator.handleListTransactions(filteredProducerIds,
filteredStates)
+ val durationFilter = listTransactionsRequest.data.durationFilter()
+ val response = txnCoordinator.handleListTransactions(filteredProducerIds,
filteredStates, durationFilter)
// The response should contain only transactionalIds that the principal
// has `Describe` permission to access.
diff --git
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 94ffe7a9795..907e4c9c17e 100644
---
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -489,7 +489,8 @@ class TransactionStateManagerTest {
transactionManager.addLoadingPartition(partitionId = 0, coordinatorEpoch =
15)
val listResponse = transactionManager.listTransactionStates(
filterProducerIds = Set.empty,
- filterStateNames = Set.empty
+ filterStateNames = Set.empty,
+ -1L
)
assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS,
Errors.forCode(listResponse.errorCode))
}
@@ -513,12 +514,16 @@ class TransactionStateManagerTest {
putTransaction(transactionalId = "t0", producerId = 0, state = Ongoing)
putTransaction(transactionalId = "t1", producerId = 1, state = Ongoing)
+ // update time to create transactions with various durations
+ time.sleep(1000)
putTransaction(transactionalId = "t2", producerId = 2, state =
PrepareCommit)
putTransaction(transactionalId = "t3", producerId = 3, state =
PrepareAbort)
+ time.sleep(1000)
putTransaction(transactionalId = "t4", producerId = 4, state =
CompleteCommit)
putTransaction(transactionalId = "t5", producerId = 5, state =
CompleteAbort)
putTransaction(transactionalId = "t6", producerId = 6, state =
CompleteAbort)
putTransaction(transactionalId = "t7", producerId = 7, state =
PrepareEpochFence)
+ time.sleep(1000)
// Note that `Dead` transactions are never returned. This is a transient
state
// which is used when the transaction state is in the process of being
deleted
// (whether though expiration or coordinator unloading).
@@ -527,16 +532,20 @@ class TransactionStateManagerTest {
def assertListTransactions(
expectedTransactionalIds: Set[String],
filterProducerIds: Set[Long] = Set.empty,
- filterStates: Set[String] = Set.empty
+ filterStates: Set[String] = Set.empty,
+ filterDuration: Long = -1L
): Unit = {
- val listResponse =
transactionManager.listTransactionStates(filterProducerIds, filterStates)
+ val listResponse =
transactionManager.listTransactionStates(filterProducerIds, filterStates,
filterDuration)
assertEquals(Errors.NONE, Errors.forCode(listResponse.errorCode))
assertEquals(expectedTransactionalIds,
listResponse.transactionStates.asScala.map(_.transactionalId).toSet)
val expectedUnknownStates = filterStates.filter(state =>
TransactionState.fromName(state).isEmpty)
assertEquals(expectedUnknownStates,
listResponse.unknownStateFilters.asScala.toSet)
}
-
assertListTransactions(Set("t0", "t1", "t2", "t3", "t4", "t5", "t6", "t7"))
+ assertListTransactions(Set("t0", "t1", "t2", "t3", "t4", "t5", "t6",
"t7"), filterDuration = 0L)
+ assertListTransactions(Set("t0", "t1", "t2", "t3"), filterDuration = 1000L)
+ assertListTransactions(Set("t0", "t1"), filterDuration = 2000L)
+ assertListTransactions(Set(), filterDuration = 3000L)
assertListTransactions(Set("t0", "t1"), filterStates = Set("Ongoing"))
assertListTransactions(Set("t0", "t1"), filterStates = Set("Ongoing",
"UnknownState"))
assertListTransactions(Set("t2", "t4"), filterStates =
Set("PrepareCommit", "CompleteCommit"))
@@ -843,7 +852,7 @@ class TransactionStateManagerTest {
}
private def listExpirableTransactionalIds(): Set[String] = {
- val activeTransactionalIds =
transactionManager.listTransactionStates(Set.empty, Set.empty)
+ val activeTransactionalIds =
transactionManager.listTransactionStates(Set.empty, Set.empty, -1L)
.transactionStates
.asScala
.map(_.transactionalId)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index f272bd5a23b..90ff37ce329 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -6499,7 +6499,7 @@ class KafkaApisTest extends Logging {
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
any[Long])).thenReturn(0)
- when(txnCoordinator.handleListTransactions(Set.empty[Long],
Set.empty[String]))
+ when(txnCoordinator.handleListTransactions(Set.empty[Long],
Set.empty[String], -1L))
.thenReturn(new ListTransactionsResponseData()
.setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code))
kafkaApis = createKafkaApis()
@@ -6529,7 +6529,7 @@ class KafkaApisTest extends Logging {
.setProducerId(98765)
.setTransactionState("PrepareAbort"))
- when(txnCoordinator.handleListTransactions(Set.empty[Long],
Set.empty[String]))
+ when(txnCoordinator.handleListTransactions(Set.empty[Long],
Set.empty[String], -1L))
.thenReturn(new ListTransactionsResponseData()
.setErrorCode(Errors.NONE.code)
.setTransactionStates(transactionStates))
diff --git
a/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
b/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
index 243f28035f4..479db63e18a 100644
--- a/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
@@ -436,16 +436,26 @@ public abstract class TransactionsCommand {
@Override
public void addSubparser(Subparsers subparsers) {
- subparsers.addParser(name())
+ Subparser subparser = subparsers.addParser(name())
.help("list transactions");
+
+ subparser.addArgument("--duration-filter")
+ .help("Duration (in millis) to filter by: if < 0, all
transactions will be returned; " +
+ "otherwise, only transactions running longer than
this duration will be returned")
+ .action(store())
+ .type(Long.class)
+ .required(false);
}
@Override
public void execute(Admin admin, Namespace ns, PrintStream out) throws
Exception {
+ ListTransactionsOptions options = new ListTransactionsOptions();
+
Optional.ofNullable(ns.getLong("duration_filter")).ifPresent(options::filterOnDuration);
+
final Map<Integer, Collection<TransactionListing>> result;
try {
- result = admin.listTransactions(new ListTransactionsOptions())
+ result = admin.listTransactions(options)
.allByBrokerId()
.get();
} catch (ExecutionException e) {
diff --git
a/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java
index ef7b5b2ab99..fde6434cf02 100644
--- a/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java
@@ -187,14 +187,25 @@ public class TransactionsCommandTest {
assertEquals(expectedRows, new HashSet<>(table.subList(1,
table.size())));
}
- @Test
- public void testListTransactions() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testListTransactions(boolean hasDurationFilter) throws
Exception {
String[] args = new String[] {
"--bootstrap-server",
"localhost:9092",
"list"
};
+ if (hasDurationFilter) {
+ args = new String[] {
+ "--bootstrap-server",
+ "localhost:9092",
+ "list",
+ "--duration-filter",
+ Long.toString(Long.MAX_VALUE)
+ };
+ }
+
Map<Integer, Collection<TransactionListing>> transactions = new
HashMap<>();
transactions.put(0, asList(
new TransactionListing("foo", 12345L, TransactionState.ONGOING),
@@ -204,7 +215,11 @@ public class TransactionsCommandTest {
new TransactionListing("baz", 13579L,
TransactionState.COMPLETE_COMMIT)
));
- expectListTransactions(transactions);
+ if (hasDurationFilter) {
+ expectListTransactions(new
ListTransactionsOptions().filterOnDuration(Long.MAX_VALUE), transactions);
+ } else {
+ expectListTransactions(transactions);
+ }
execute(args);
assertNormalExit();