This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.8 by this push:
new ba4eed1 MINOR: Adjust parameter ordering of `waitForCondition` and
`retryOnExceptionWithTimeout` (#10759) (#10776)
ba4eed1 is described below
commit ba4eed1755b5bad8e7052b28eedb8cc8caae6fff
Author: Ismael Juma <[email protected]>
AuthorDate: Thu May 27 09:57:57 2021 -0700
MINOR: Adjust parameter ordering of `waitForCondition` and
`retryOnExceptionWithTimeout` (#10759) (#10776)
New parameters in overloaded methods should appear later apart from
lambdas that should always be last.
Reviewers: Chia-Ping Tsai <[email protected]>
---
clients/src/test/java/org/apache/kafka/test/TestUtils.java | 10 +++++-----
.../org/apache/kafka/controller/QuorumControllerTestEnv.java | 2 +-
.../java/org/apache/kafka/metalog/LocalLogManagerTest.java | 2 +-
.../java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java | 2 +-
.../streams/integration/OptimizedKTableIntegrationTest.java | 4 ++--
5 files changed, 10 insertions(+), 10 deletions(-)
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index bac8a2b..d06641f 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -316,7 +316,7 @@ public class TestUtils {
*/
public static void retryOnExceptionWithTimeout(final long timeoutMs,
final ValuelessCallable
runnable) throws InterruptedException {
- retryOnExceptionWithTimeout(DEFAULT_POLL_INTERVAL_MS, timeoutMs,
runnable);
+ retryOnExceptionWithTimeout(timeoutMs, DEFAULT_POLL_INTERVAL_MS,
runnable);
}
/**
@@ -328,7 +328,7 @@ public class TestUtils {
* @throws InterruptedException if the current thread is interrupted while
waiting for {@code runnable} to complete successfully.
*/
public static void retryOnExceptionWithTimeout(final ValuelessCallable
runnable) throws InterruptedException {
- retryOnExceptionWithTimeout(DEFAULT_POLL_INTERVAL_MS,
DEFAULT_MAX_WAIT_MS, runnable);
+ retryOnExceptionWithTimeout(DEFAULT_MAX_WAIT_MS,
DEFAULT_POLL_INTERVAL_MS, runnable);
}
/**
@@ -336,13 +336,13 @@ public class TestUtils {
* {@link AssertionError}s, or for the given timeout to expire. If the
timeout expires then the
* last exception or assertion failure will be thrown thus providing
context for the failure.
*
- * @param pollIntervalMs the interval in milliseconds to wait between
invoking {@code runnable}.
* @param timeoutMs the total time in milliseconds to wait for {@code
runnable} to complete successfully.
+ * @param pollIntervalMs the interval in milliseconds to wait between
invoking {@code runnable}.
* @param runnable the code to attempt to execute successfully.
* @throws InterruptedException if the current thread is interrupted while
waiting for {@code runnable} to complete successfully.
*/
- public static void retryOnExceptionWithTimeout(final long pollIntervalMs,
- final long timeoutMs,
+ public static void retryOnExceptionWithTimeout(final long timeoutMs,
+ final long pollIntervalMs,
final ValuelessCallable
runnable) throws InterruptedException {
final long expectedEnd = System.currentTimeMillis() + timeoutMs;
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
index 9927042..20030ab 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
@@ -53,7 +53,7 @@ public class QuorumControllerTestEnv implements AutoCloseable
{
QuorumController activeController() throws InterruptedException {
AtomicReference<QuorumController> value = new AtomicReference<>(null);
- TestUtils.retryOnExceptionWithTimeout(3, 20000, () -> {
+ TestUtils.retryOnExceptionWithTimeout(20000, 3, () -> {
QuorumController activeController = null;
for (QuorumController controller : controllers) {
if (controller.isActive()) {
diff --git
a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java
b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java
index ac578fb..927004c 100644
--- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java
@@ -92,7 +92,7 @@ public class LocalLogManagerTest {
private static void waitForLastCommittedOffset(long targetOffset,
LocalLogManager logManager) throws InterruptedException {
- TestUtils.retryOnExceptionWithTimeout(3, 20000, () -> {
+ TestUtils.retryOnExceptionWithTimeout(20000, 3, () -> {
MockMetaLogManagerListener listener =
(MockMetaLogManagerListener) logManager.listeners().get(0);
long highestOffset = -1;
diff --git
a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java
b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java
index 52aeea0..21e0fdb 100644
---
a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java
+++
b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java
@@ -102,7 +102,7 @@ public class LocalLogManagerTestEnv implements
AutoCloseable {
MetaLogLeader waitForLeader() throws InterruptedException {
AtomicReference<MetaLogLeader> value = new AtomicReference<>(null);
- TestUtils.retryOnExceptionWithTimeout(3, 20000, () -> {
+ TestUtils.retryOnExceptionWithTimeout(2000, 3, () -> {
MetaLogLeader result = null;
for (LocalLogManager logManager : logManagers) {
MetaLogLeader leader = logManager.leader();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
index cefb712..933e7c3 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
@@ -134,7 +134,7 @@ public class OptimizedKTableIntegrationTest {
}
final ReadOnlyKeyValueStore<Integer, Integer> newActiveStore =
kafkaStreams1WasFirstActive ? store2 : store1;
- TestUtils.retryOnExceptionWithTimeout(100, 60 * 1000, () -> {
+ TestUtils.retryOnExceptionWithTimeout(60 * 1000, 100, () -> {
// Assert that after failover we have recovered to the last store
write
assertThat(newActiveStore.get(key), is(equalTo(batch1NumMessages -
1)));
});
@@ -146,7 +146,7 @@ public class OptimizedKTableIntegrationTest {
// Assert that all messages in the second batch were processed in a
timely manner
assertThat(semaphore.tryAcquire(batch2NumMessages, 60,
TimeUnit.SECONDS), is(equalTo(true)));
- TestUtils.retryOnExceptionWithTimeout(100, 60 * 1000, () -> {
+ TestUtils.retryOnExceptionWithTimeout(60 * 1000, 100, () -> {
// Assert that the current value in store reflects all messages
being processed
assertThat(newActiveStore.get(key), is(equalTo(totalNumMessages -
1)));
});