This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4731c19ab3e MINOR: Fix invocation of TestUtils::waitForCondition
(#20687)
4731c19ab3e is described below
commit 4731c19ab3e385b9643e85c114410d1a39f87395
Author: Gaurav Narula <[email protected]>
AuthorDate: Mon Oct 13 14:47:33 2025 +0100
MINOR: Fix invocation of TestUtils::waitForCondition (#20687)
There are some calls to `TestUtils::waitForCondition` with the actual
value of the condition being evaluated. These calls must use the
overload with a `Supplier` for conditionDetails so that the actual value
is lazily evaluated at the time of the condition failing.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../src/test/java/org/apache/kafka/clients/ClientsTestUtils.java | 2 +-
.../apache/kafka/clients/admin/ListOffsetsIntegrationTest.java | 8 +++++++-
.../org/apache/kafka/clients/consumer/ConsumerBounceTest.java | 4 ++--
.../apache/kafka/clients/consumer/PlaintextConsumerPollTest.java | 2 +-
.../java/org/apache/kafka/common/network/Tls13SelectorTest.java | 2 +-
.../org/apache/kafka/connect/util/clusters/ConnectAssertions.java | 4 ++--
.../apache/kafka/tiered/storage/actions/ExpectLeaderAction.java | 2 +-
.../streams/integration/EOSUncleanShutdownIntegrationTest.java | 6 +++---
.../integration/HighAvailabilityTaskAssignorIntegrationTest.java | 4 ++--
.../integration/KTableKTableForeignKeyJoinDistributedTest.java | 2 +-
.../kafka/streams/integration/StandbyTaskEOSIntegrationTest.java | 2 +-
.../kafka/streams/integration/utils/IntegrationTestUtils.java | 5 ++---
.../src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java | 2 +-
.../apache/kafka/tools/consumer/group/ListConsumerGroupTest.java | 2 +-
.../apache/kafka/tools/consumer/group/ShareGroupCommandTest.java | 6 +++---
.../org/apache/kafka/tools/streams/DeleteStreamsGroupTest.java | 2 +-
.../java/org/apache/kafka/tools/streams/ListStreamsGroupTest.java | 8 ++++----
.../apache/kafka/tools/streams/ResetStreamsGroupOffsetTest.java | 4 ++--
.../org/apache/kafka/tools/streams/StreamsGroupCommandTest.java | 6 +++---
19 files changed, 39 insertions(+), 34 deletions(-)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientsTestUtils.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientsTestUtils.java
index a007035e860..5635f6e36b7 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientsTestUtils.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientsTestUtils.java
@@ -282,7 +282,7 @@ public class ClientsTestUtils {
TestUtils.waitForCondition(() -> {
consumer.poll(Duration.ofMillis(100));
return consumer.assignment().equals(expectedAssignment);
- }, "Timed out while awaiting expected assignment " +
expectedAssignment + ". " +
+ }, () -> "Timed out while awaiting expected assignment " +
expectedAssignment + ". " +
"The current assignment is " + consumer.assignment()
);
}
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/ListOffsetsIntegrationTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/ListOffsetsIntegrationTest.java
index 678a3b23b46..57939e8dc54 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/ListOffsetsIntegrationTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/ListOffsetsIntegrationTest.java
@@ -233,7 +233,13 @@ public class ListOffsetsIntegrationTest {
} catch (InterruptedException | ExecutionException e) {
return false;
}
- }, String.format("expected leader: %d but actual: %d", newLeader,
clusterInstance.getLeaderBrokerId(new TopicPartition(topic, 0))));
+ }, () -> {
+ try {
+ return String.format("expected leader: %d but actual: %d",
newLeader, clusterInstance.getLeaderBrokerId(new TopicPartition(topic, 0)));
+ } catch (ExecutionException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
checkListOffsets(topic, expectedMaxTimestampOffset);
// case 2: test the offsets from recovery path.
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerBounceTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerBounceTest.java
index 2583d099977..64c3038a1a5 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerBounceTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerBounceTest.java
@@ -564,7 +564,7 @@ public class ConsumerBounceTest {
assignments.clear();
consumerPollers.forEach(poller ->
assignments.add(poller.consumerAssignment()));
return isPartitionAssignmentValid(assignments, subscriptions,
expectedAssignments);
- }, waitTimeMs, msg.orElse("Did not get valid assignment for partitions
" + subscriptions + ". Instead got: " + assignments));
+ }, waitTimeMs, () -> msg.orElse("Did not get valid assignment for
partitions " + subscriptions + ". Instead got: " + assignments));
}
// Overload for convenience (optional msg and expectedAssignments)
@@ -769,7 +769,7 @@ public class ConsumerBounceTest {
private void receiveExactRecords(ConsumerAssignmentPoller consumer, int
numRecords, long timeoutMs) throws InterruptedException {
TestUtils.waitForCondition(() -> consumer.receivedMessages() ==
numRecords, timeoutMs,
- String.format("Consumer did not receive expected %d. It received
%d", numRecords, consumer.receivedMessages()));
+ () -> String.format("Consumer did not receive expected %d. It
received %d", numRecords, consumer.receivedMessages()));
}
// A mock class to represent broker bouncing (simulate broker restart
behavior)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerPollTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerPollTest.java
index 71853d97814..3383049465e 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerPollTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerPollTest.java
@@ -660,7 +660,7 @@ public class PlaintextConsumerPollTest {
}
return isPartitionAssignmentValid(assignments, subscriptions);
}, GROUP_MAX_SESSION_TIMEOUT_MS * 3,
- msg != null ? msg : "Did not get valid assignment for partitions "
+ subscriptions + ". Instead, got " + assignments
+ () -> msg != null ? msg : "Did not get valid assignment for
partitions " + subscriptions + ". Instead, got " + assignments
);
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/network/Tls13SelectorTest.java
b/clients/src/test/java/org/apache/kafka/common/network/Tls13SelectorTest.java
index 0b6003c963a..7adc3d3eb3c 100644
---
a/clients/src/test/java/org/apache/kafka/common/network/Tls13SelectorTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/network/Tls13SelectorTest.java
@@ -92,7 +92,7 @@ public class Tls13SelectorTest extends SslSelectorTest {
received.add(receive);
}
return received.size() == 2;
- }, "Expected two receives, got " + received.size());
+ }, () -> "Expected two receives, got " + received.size());
assertEquals(asList("0-0", "0-1"),
received.stream().map(this::asString).collect(Collectors.toList()));
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/ConnectAssertions.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/ConnectAssertions.java
index c901361cb64..9b8156a4ce2 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/ConnectAssertions.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/ConnectAssertions.java
@@ -154,7 +154,7 @@ public class ConnectAssertions {
return actual.isEmpty();
}).orElse(false),
CONNECTOR_SETUP_DURATION_MS,
- "Unexpectedly found topics " + existingTopics.get());
+ () -> "Unexpectedly found topics " + existingTopics.get());
}
/**
@@ -173,7 +173,7 @@ public class ConnectAssertions {
return missing.isEmpty();
}).orElse(false),
CONNECTOR_SETUP_DURATION_MS,
- "Didn't find the topics " + missingTopics.get());
+ () -> "Didn't find the topics " + missingTopics.get());
}
protected Optional<Boolean> checkTopicsExist(Set<String> topicNames,
BiFunction<Set<String>, Set<String>, Boolean> comp) {
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ExpectLeaderAction.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ExpectLeaderAction.java
index 16c1b4ec8b2..0caf4cd4fad 100644
---
a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ExpectLeaderAction.java
+++
b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ExpectLeaderAction.java
@@ -82,7 +82,7 @@ public final class ExpectLeaderAction implements
TieredStorageTestAction {
}
throw new RuntimeException(ex);
}
- }, "Leader of " + topicPartition + " was not " + replicaId + ". Actual
leader: " + actualLeader);
+ }, () -> "Leader of " + topicPartition + " was not " + replicaId + ".
Actual leader: " + actualLeader);
}
@Override
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java
index a21ee3a1fd6..32f5dd69b75 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java
@@ -125,7 +125,7 @@ public class EOSUncleanShutdownIntegrationTest {
driver.start();
TestUtils.waitForCondition(() -> driver.state().equals(State.RUNNING),
- "Expected RUNNING state but driver is on " + driver.state());
+ () -> "Expected RUNNING state but driver is on " +
driver.state());
// Task's StateDir
final File taskStateDir = new File(String.join("/",
TEST_FOLDER.getPath(), appId, "0_0"));
@@ -143,12 +143,12 @@ public class EOSUncleanShutdownIntegrationTest {
new KeyValueTimestamp<>("k3", "v3", 2L)));
TestUtils.waitForCondition(() -> recordCount.get() == RECORD_TOTAL,
- "Expected " + RECORD_TOTAL + " records processed but only got
" + recordCount.get());
+ () -> "Expected " + RECORD_TOTAL + " records processed but
only got " + recordCount.get());
} catch (final Exception e) {
e.printStackTrace();
} finally {
TestUtils.waitForCondition(() ->
driver.state().equals(State.ERROR),
- "Expected ERROR state but driver is on " + driver.state());
+ () -> "Expected ERROR state but driver is on " +
driver.state());
driver.close();
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java
index c57d7920faa..42ad0277d5d 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java
@@ -236,13 +236,13 @@ public class HighAvailabilityTaskAssignorIntegrationTest {
}
},
120_000L,
- "Never saw a first assignment after scale out: " +
assignmentsCompleted.get()
+ () -> "Never saw a first assignment after scale out: " +
assignmentsCompleted.get()
);
TestUtils.waitForCondition(
assignmentStable::get,
120_000L,
- "Assignment hasn't become stable: " +
assignmentsCompleted.get() +
+ () -> "Assignment hasn't become stable: " +
assignmentsCompleted.get() +
" Note, if this does fail, check and see if the new
instance just failed to catch up within" +
" the probing rebalance interval. A full minute should be
long enough to read ~500 records" +
" in any test environment, but you never know..."
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java
index 637daad1e0c..84d9c9d0fcd 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java
@@ -232,7 +232,7 @@ public class KTableKTableForeignKeyJoinDistributedTest {
private void waitUntilBothClientAreOK(final String message) throws
Exception {
TestUtils.waitForCondition(() -> client1IsOk && client2IsOk,
30 * 1000,
- message + ": "
+ () -> message + ": "
+ "Client 1 is " + (!client1IsOk ? "NOT " : "") + "OK,
"
+ "client 2 is " + (!client2IsOk ? "NOT " : "") + "OK."
);
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
index cecf49bdfaa..67435c3afeb 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
@@ -317,7 +317,7 @@ public class StandbyTaskEOSIntegrationTest {
);
waitForCondition(
() -> streamInstanceOneRecovery.state() ==
KafkaStreams.State.ERROR,
- "Stream instance 1 did not go into error state. Is in " +
streamInstanceOneRecovery.state() + " state."
+ () -> "Stream instance 1 did not go into error state. Is in "
+ streamInstanceOneRecovery.state() + " state."
);
}
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 44ea3c7fc89..832e8eb1a06 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -848,9 +848,8 @@ public class IntegrationTestUtils {
return finalAccumData.equals(finalExpected);
};
- final String conditionDetails = "Did not receive all " +
expectedRecords + " records from topic " +
- topic + " (got " + accumData + ")";
- TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
+ TestUtils.waitForCondition(valuesRead, waitTime, () -> "Did not
receive all " + expectedRecords + " records from topic " +
+ topic + " (got " + accumData + ")");
}
return accumData;
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 83eeb0befc6..14d4cf8c21f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -1572,7 +1572,7 @@ public class KafkaStreamsTest {
streams.start();
waitForCondition(
() -> streams.state() == KafkaStreams.State.RUNNING,
- "Streams never started, state is " + streams.state());
+ () -> "Streams never started, state is " +
streams.state());
streams.close();
diff --git
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java
index 8775d26869e..896ed067062 100644
---
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java
@@ -113,7 +113,7 @@ public class ListConsumerGroupTest {
TestUtils.waitForCondition(() -> {
foundGroups.set(set(service.listConsumerGroups()));
return Objects.equals(expectedGroups, foundGroups.get());
- }, "Expected --list to show groups " + expectedGroups + ", but
found " + foundGroups.get() + ".");
+ }, () -> "Expected --list to show groups " + expectedGroups +
", but found " + foundGroups.get() + ".");
}
removeConsumer(set(List.of(topicPartitionsGroup, topicGroup,
protocolGroup)));
diff --git
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
index 03308d94c39..3b1843b95cd 100644
---
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
@@ -139,7 +139,7 @@ public class ShareGroupCommandTest {
TestUtils.waitForCondition(() -> {
foundGroups[0] = new HashSet<>(service.listShareGroups());
return Objects.equals(expectedGroups, foundGroups[0]);
- }, "Expected --list to show groups " + expectedGroups + ", but
found " + foundGroups[0] + ".");
+ }, () -> "Expected --list to show groups " + expectedGroups + ",
but found " + foundGroups[0] + ".");
}
}
@@ -166,7 +166,7 @@ public class ShareGroupCommandTest {
TestUtils.waitForCondition(() -> {
foundListing[0] = new
HashSet<>(service.listShareGroupsInStates(Set.of(GroupState.values())));
return Objects.equals(expectedListing, foundListing[0]);
- }, "Expected to show groups " + expectedListing + ", but found " +
foundListing[0]);
+ }, () -> "Expected to show groups " + expectedListing + ", but
found " + foundListing[0]);
ListGroupsResult resultWithStableState =
mock(ListGroupsResult.class);
when(resultWithStableState.all()).thenReturn(KafkaFuture.completedFuture(List.of(
@@ -181,7 +181,7 @@ public class ShareGroupCommandTest {
TestUtils.waitForCondition(() -> {
foundListing[0] = new
HashSet<>(service.listShareGroupsInStates(Set.of(GroupState.STABLE)));
return Objects.equals(expectedListingStable, foundListing[0]);
- }, "Expected to show groups " + expectedListingStable + ", but
found " + foundListing[0]);
+ }, () -> "Expected to show groups " + expectedListingStable + ",
but found " + foundListing[0]);
}
}
diff --git
a/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupTest.java
b/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupTest.java
index 80d1806ed6c..8730d23066e 100644
---
a/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupTest.java
@@ -506,7 +506,7 @@ public class DeleteStreamsGroupTest {
"The group did not become stable as expected."
);
TestUtils.waitForCondition(() -> recordCount.get() == RECORD_TOTAL,
- "Expected " + RECORD_TOTAL + " records processed but only got " +
recordCount.get());
+ () -> "Expected " + RECORD_TOTAL + " records processed but
only got " + recordCount.get());
return streams;
}
diff --git
a/tools/src/test/java/org/apache/kafka/tools/streams/ListStreamsGroupTest.java
b/tools/src/test/java/org/apache/kafka/tools/streams/ListStreamsGroupTest.java
index fcb971d35c7..57dc8941f2b 100644
---
a/tools/src/test/java/org/apache/kafka/tools/streams/ListStreamsGroupTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/streams/ListStreamsGroupTest.java
@@ -108,7 +108,7 @@ public class ListStreamsGroupTest {
TestUtils.waitForCondition(() -> {
foundGroups.set(new HashSet<>(service.listStreamsGroups()));
return Objects.equals(expectedGroups, foundGroups.get());
- }, "Expected --list to show streams groups " + expectedGroups + ",
but found " + foundGroups.get() + ".");
+ }, () -> "Expected --list to show streams groups " +
expectedGroups + ", but found " + foundGroups.get() + ".");
}
}
@@ -135,7 +135,7 @@ public class ListStreamsGroupTest {
TestUtils.waitForCondition(() -> {
foundListing.set(new
HashSet<>(service.listStreamsGroupsInStates(Set.of())));
return Objects.equals(expectedListing, foundListing.get());
- }, "Expected --list to show streams groups " + expectedListing +
", but found " + foundListing.get() + ".");
+ }, () -> "Expected --list to show streams groups " +
expectedListing + ", but found " + foundListing.get() + ".");
}
}
@@ -155,7 +155,7 @@ public class ListStreamsGroupTest {
TestUtils.waitForCondition(() -> {
foundListing.set(new
HashSet<>(service.listStreamsGroupsInStates(Set.of())));
return Objects.equals(expectedListing, foundListing.get());
- }, "Expected --list to show streams groups " + expectedListing +
", but found " + foundListing.get() + ".");
+ }, () -> "Expected --list to show streams groups " +
expectedListing + ", but found " + foundListing.get() + ".");
}
try (StreamsGroupCommand.StreamsGroupService service =
getStreamsGroupService(new String[]{"--bootstrap-server",
cluster.bootstrapServers(), "--list", "--state", "PreparingRebalance"})) {
@@ -166,7 +166,7 @@ public class ListStreamsGroupTest {
TestUtils.waitForCondition(() -> {
foundListing.set(new
HashSet<>(service.listStreamsGroupsInStates(Set.of(GroupState.PREPARING_REBALANCE))));
return Objects.equals(expectedListing, foundListing.get());
- }, "Expected --list to show streams groups " + expectedListing +
", but found " + foundListing.get() + ".");
+ }, () -> "Expected --list to show streams groups " +
expectedListing + ", but found " + foundListing.get() + ".");
}
}
diff --git
a/tools/src/test/java/org/apache/kafka/tools/streams/ResetStreamsGroupOffsetTest.java
b/tools/src/test/java/org/apache/kafka/tools/streams/ResetStreamsGroupOffsetTest.java
index 2838e496b59..ed888abe378 100644
---
a/tools/src/test/java/org/apache/kafka/tools/streams/ResetStreamsGroupOffsetTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/streams/ResetStreamsGroupOffsetTest.java
@@ -688,12 +688,12 @@ public class ResetStreamsGroupOffsetTest {
TestUtils.waitForCondition(() ->
streams.state().equals(KafkaStreams.State.RUNNING),
- "Expected RUNNING state but streams is on " + streams.state());
+ () -> "Expected RUNNING state but streams is on " +
streams.state());
try {
TestUtils.waitForCondition(() -> recordCount.get() ==
numOfCommittedMessages,
- "Expected " + numOfCommittedMessages + " records processed but
only got " + recordCount.get());
+ () -> "Expected " + numOfCommittedMessages + " records
processed but only got " + recordCount.get());
} catch (final Exception e) {
e.printStackTrace();
} finally {
diff --git
a/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
index 4f1e116437e..c91a4b2090b 100644
---
a/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
@@ -105,7 +105,7 @@ public class StreamsGroupCommandTest {
TestUtils.waitForCondition(() -> {
foundGroups[0] = new HashSet<>(service.listStreamsGroups());
return Objects.equals(expectedGroups, foundGroups[0]);
- }, "Expected --list to show groups " + expectedGroups + ", but found "
+ foundGroups[0] + ".");
+ }, () -> "Expected --list to show groups " + expectedGroups + ", but
found " + foundGroups[0] + ".");
service.close();
}
@@ -138,7 +138,7 @@ public class StreamsGroupCommandTest {
TestUtils.waitForCondition(() -> {
foundListing[0] = new
HashSet<>(service.listStreamsGroupsInStates(Set.of(GroupState.values())));
return Objects.equals(expectedListing, foundListing[0]);
- }, "Expected to show groups " + expectedListing + ", but found " +
foundListing[0]);
+ }, () -> "Expected to show groups " + expectedListing + ", but found "
+ foundListing[0]);
ListGroupsResult resultWithStableState = mock(ListGroupsResult.class);
when(resultWithStableState.all()).thenReturn(KafkaFuture.completedFuture(List.of(
@@ -153,7 +153,7 @@ public class StreamsGroupCommandTest {
TestUtils.waitForCondition(() -> {
foundListing[0] = new
HashSet<>(service.listStreamsGroupsInStates(Set.of(GroupState.STABLE)));
return Objects.equals(expectedListingStable, foundListing[0]);
- }, "Expected to show groups " + expectedListingStable + ", but found "
+ foundListing[0]);
+ }, () -> "Expected to show groups " + expectedListingStable + ", but
found " + foundListing[0]);
service.close();
}