This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e90b9bd26a6 MINOR: Replace FutureUtils.failedFuture with
CompletableFuture.failedFuture (#21447)
e90b9bd26a6 is described below
commit e90b9bd26a607a218eda7e958955b89bcf95b0d8
Author: majialong <[email protected]>
AuthorDate: Wed Feb 11 11:21:04 2026 +0800
MINOR: Replace FutureUtils.failedFuture with CompletableFuture.failedFuture
(#21447)
Remove `FutureUtils.failedFuture` and use Java's built-in
`CompletableFuture.failedFuture` instead.
Reviewers: Sean Quah <[email protected]>, Ken Huang
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
.../runtime/CoordinatorExecutorImplTest.java | 5 +-
.../common/runtime/CoordinatorRuntimeTest.java | 2 +-
.../common/runtime/CoordinatorTimerImplTest.java | 17 ++++---
.../kafka/server/share/SharePartitionManager.java | 3 +-
.../server/share/SharePartitionManagerTest.java | 19 ++++----
.../kafka/server/share/SharePartitionTest.java | 5 +-
.../unit/kafka/server/ControllerApisTest.scala | 3 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 18 +++----
.../coordinator/group/GroupCoordinatorService.java | 4 +-
.../group/GroupCoordinatorServiceTest.java | 55 +++++++++++-----------
.../org/apache/kafka/server/util/FutureUtils.java | 12 -----
.../share/ShareCoordinatorServiceTest.java | 13 +++--
12 files changed, 68 insertions(+), 88 deletions(-)
diff --git
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImplTest.java
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImplTest.java
index c6e1e763798..da0f4697c51 100644
---
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImplTest.java
+++
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImplTest.java
@@ -17,7 +17,6 @@
package org.apache.kafka.coordinator.common.runtime;
import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.server.util.FutureUtils;
import org.junit.jupiter.api.Test;
@@ -215,7 +214,7 @@ public class CoordinatorExecutorImplTest {
CoordinatorShardScheduler.WriteOperation<String> op =
args.getArgument(1);
Throwable ex = assertThrows(RejectedExecutionException.class,
op::generate);
- return FutureUtils.failedFuture(ex);
+ return CompletableFuture.failedFuture(ex);
});
when(executorService.submit(any(Runnable.class))).thenAnswer(args -> {
@@ -259,7 +258,7 @@ public class CoordinatorExecutorImplTest {
when(scheduler.scheduleWriteOperation(
eq(TASK_KEY),
any()
- )).thenReturn(FutureUtils.failedFuture(new Throwable("Oh no!")));
+ )).thenReturn(CompletableFuture.failedFuture(new Throwable("Oh no!")));
when(executorService.submit(any(Runnable.class))).thenAnswer(args -> {
Runnable op = args.getArgument(0);
diff --git
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
index 3a60bcf9af8..cac48518f59 100644
---
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
+++
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
@@ -1258,7 +1258,7 @@ public class CoordinatorRuntimeTest {
100L,
(short) 50,
TXN_OFFSET_COMMIT_LATEST_VERSION
-
)).thenReturn(FutureUtils.failedFuture(Errors.NOT_ENOUGH_REPLICAS.exception()));
+
)).thenReturn(CompletableFuture.failedFuture(Errors.NOT_ENOUGH_REPLICAS.exception()));
// Schedule a transactional write.
CompletableFuture<String> future =
runtime.scheduleTransactionalWriteOperation(
diff --git
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimerImplTest.java
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimerImplTest.java
index eb0058db43e..36eb152fef6 100644
---
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimerImplTest.java
+++
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimerImplTest.java
@@ -19,7 +19,6 @@ package org.apache.kafka.coordinator.common.runtime;
import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
import org.apache.kafka.common.errors.NotCoordinatorException;
import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.server.util.FutureUtils;
import org.apache.kafka.server.util.timer.MockTimer;
import org.junit.jupiter.api.Test;
@@ -135,7 +134,7 @@ public class CoordinatorTimerImplTest {
operationCalled.set(true);
} catch (RejectedExecutionException e) {
rejectedExceptionThrown.set(true);
- return FutureUtils.failedFuture(e);
+ return CompletableFuture.failedFuture(e);
}
return CompletableFuture.completedFuture(null);
};
@@ -180,7 +179,7 @@ public class CoordinatorTimerImplTest {
operation.generate();
} catch (RejectedExecutionException e) {
// Expected for the overridden timer.
- return FutureUtils.failedFuture(e);
+ return CompletableFuture.failedFuture(e);
}
return CompletableFuture.completedFuture(null);
};
@@ -237,7 +236,7 @@ public class CoordinatorTimerImplTest {
var count = callCount.incrementAndGet();
if (count == 1) {
// Fail the first time.
- return FutureUtils.failedFuture(new
RuntimeException("Simulated failure"));
+ return CompletableFuture.failedFuture(new
RuntimeException("Simulated failure"));
}
return CompletableFuture.completedFuture(null);
};
@@ -281,7 +280,7 @@ public class CoordinatorTimerImplTest {
CoordinatorShardScheduler<String> scheduler = (operationName,
operation) -> {
operation.generate();
callCount.incrementAndGet();
- return FutureUtils.failedFuture(new RuntimeException("Simulated
failure"));
+ return CompletableFuture.failedFuture(new
RuntimeException("Simulated failure"));
};
var timer = new CoordinatorTimerImpl<>(
@@ -322,7 +321,7 @@ public class CoordinatorTimerImplTest {
CoordinatorShardScheduler<String> scheduler = (operationName,
operation) -> {
operation.generate();
callCount.incrementAndGet();
- return FutureUtils.failedFuture(new NotCoordinatorException("Not
coordinator"));
+ return CompletableFuture.failedFuture(new
NotCoordinatorException("Not coordinator"));
};
var timer = new CoordinatorTimerImpl<>(
@@ -363,7 +362,7 @@ public class CoordinatorTimerImplTest {
CoordinatorShardScheduler<String> scheduler = (operationName,
operation) -> {
operation.generate();
callCount.incrementAndGet();
- return FutureUtils.failedFuture(new
CoordinatorLoadInProgressException("Loading"));
+ return CompletableFuture.failedFuture(new
CoordinatorLoadInProgressException("Loading"));
};
var timer = new CoordinatorTimerImpl<>(
@@ -543,7 +542,7 @@ public class CoordinatorTimerImplTest {
operation.generate();
var count = callCount.incrementAndGet();
if (count == 1) {
- return FutureUtils.failedFuture(new
RuntimeException("Simulated failure"));
+ return CompletableFuture.failedFuture(new
RuntimeException("Simulated failure"));
}
return CompletableFuture.completedFuture(null);
};
@@ -589,7 +588,7 @@ public class CoordinatorTimerImplTest {
// (2) events failing before being executed.
CoordinatorShardScheduler<String> scheduler = (operationName,
operation) -> {
// Don't call operation.generate() - simulates event never being
executed
- return FutureUtils.failedFuture(new NotCoordinatorException("Not
coordinator"));
+ return CompletableFuture.failedFuture(new
NotCoordinatorException("Not coordinator"));
};
var timer = new CoordinatorTimerImpl<>(
diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java
b/core/src/main/java/kafka/server/share/SharePartitionManager.java
index 3150e8e28a6..1f37eadf11b 100644
--- a/core/src/main/java/kafka/server/share/SharePartitionManager.java
+++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java
@@ -53,7 +53,6 @@ import org.apache.kafka.server.share.session.ShareSession;
import org.apache.kafka.server.share.session.ShareSessionCache;
import org.apache.kafka.server.share.session.ShareSessionKey;
import org.apache.kafka.server.storage.log.FetchParams;
-import org.apache.kafka.server.util.FutureUtils;
import org.apache.kafka.server.util.timer.SystemTimer;
import org.apache.kafka.server.util.timer.SystemTimerReaper;
import org.apache.kafka.server.util.timer.Timer;
@@ -362,7 +361,7 @@ public class SharePartitionManager implements AutoCloseable
{
ShareSessionKey key = shareSessionKey(groupId, memberId);
if (cache.remove(key) == null) {
log.error("Share session error for {}: no such share session
found", key);
- return
FutureUtils.failedFuture(Errors.SHARE_SESSION_NOT_FOUND.exception());
+ return
CompletableFuture.failedFuture(Errors.SHARE_SESSION_NOT_FOUND.exception());
} else {
log.debug("Removed share session with key {}", key);
}
diff --git
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index ef5c6f087a4..d177ac6c9a9 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -75,7 +75,6 @@ import
org.apache.kafka.server.share.session.ShareSessionCache;
import org.apache.kafka.server.share.session.ShareSessionKey;
import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.server.storage.log.FetchParams;
-import org.apache.kafka.server.util.FutureUtils;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.server.util.timer.MockTimer;
import org.apache.kafka.server.util.timer.SystemTimer;
@@ -1276,7 +1275,7 @@ public class SharePartitionManagerTest {
SharePartition sp1 = mock(SharePartition.class);
SharePartition sp2 = mock(SharePartition.class);
when(sp1.releaseAcquiredRecords(ArgumentMatchers.eq(memberId))).thenReturn(CompletableFuture.completedFuture(null));
-
when(sp2.releaseAcquiredRecords(ArgumentMatchers.eq(memberId))).thenReturn(FutureUtils.failedFuture(
+
when(sp2.releaseAcquiredRecords(ArgumentMatchers.eq(memberId))).thenReturn(CompletableFuture.failedFuture(
new InvalidRecordStateException("Unable to release acquired
records for the batch")
));
@@ -1635,7 +1634,7 @@ public class SharePartitionManagerTest {
TopicIdPartition tp = new TopicIdPartition(Uuid.randomUuid(), new
TopicPartition("foo", 0));
SharePartition sp = mock(SharePartition.class);
- when(sp.acknowledge(ArgumentMatchers.eq(memberId),
any())).thenReturn(FutureUtils.failedFuture(
+ when(sp.acknowledge(ArgumentMatchers.eq(memberId),
any())).thenReturn(CompletableFuture.failedFuture(
new InvalidRequestException("Member is not the owner of batch
record")
));
SharePartitionCache partitionCache = new SharePartitionCache();
@@ -2352,7 +2351,7 @@ public class SharePartitionManagerTest {
.build();
// Return LeaderNotAvailableException to simulate initialization
failure.
- when(sp0.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new
LeaderNotAvailableException("Leader not available")));
+
when(sp0.maybeInitialize()).thenReturn(CompletableFuture.failedFuture(new
LeaderNotAvailableException("Leader not available")));
CompletableFuture<Map<TopicIdPartition,
ShareFetchResponseData.PartitionData>> future =
sharePartitionManager.fetchMessages(groupId, memberId,
FETCH_PARAMS, BATCH_OPTIMIZED, 0,
MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
@@ -2369,7 +2368,7 @@ public class SharePartitionManagerTest {
assertEquals(1, partitionCache.size());
// Return IllegalStateException to simulate initialization failure.
- when(sp0.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new
IllegalStateException("Illegal state")));
+
when(sp0.maybeInitialize()).thenReturn(CompletableFuture.failedFuture(new
IllegalStateException("Illegal state")));
future = sharePartitionManager.fetchMessages(groupId, memberId,
FETCH_PARAMS, BATCH_OPTIMIZED, 0,
MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
TestUtils.waitForCondition(
@@ -2383,7 +2382,7 @@ public class SharePartitionManagerTest {
// The last exception removes the share partition from the cache hence
re-add the share partition to cache.
partitionCache.put(new SharePartitionKey(groupId, tp0), sp0);
// Return CoordinatorNotAvailableException to simulate initialization
failure.
- when(sp0.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new
CoordinatorNotAvailableException("Coordinator not available")));
+
when(sp0.maybeInitialize()).thenReturn(CompletableFuture.failedFuture(new
CoordinatorNotAvailableException("Coordinator not available")));
future = sharePartitionManager.fetchMessages(groupId, memberId,
FETCH_PARAMS, BATCH_OPTIMIZED, 0,
MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
TestUtils.waitForCondition(
@@ -2397,7 +2396,7 @@ public class SharePartitionManagerTest {
// The last exception removes the share partition from the cache hence
re-add the share partition to cache.
partitionCache.put(new SharePartitionKey(groupId, tp0), sp0);
// Return InvalidRequestException to simulate initialization failure.
- when(sp0.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new
InvalidRequestException("Invalid request")));
+
when(sp0.maybeInitialize()).thenReturn(CompletableFuture.failedFuture(new
InvalidRequestException("Invalid request")));
future = sharePartitionManager.fetchMessages(groupId, memberId,
FETCH_PARAMS, BATCH_OPTIMIZED, 0,
MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
TestUtils.waitForCondition(
@@ -2411,7 +2410,7 @@ public class SharePartitionManagerTest {
// The last exception removes the share partition from the cache hence
re-add the share partition to cache.
partitionCache.put(new SharePartitionKey(groupId, tp0), sp0);
// Return FencedStateEpochException to simulate initialization failure.
- when(sp0.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new
FencedStateEpochException("Fenced state epoch")));
+
when(sp0.maybeInitialize()).thenReturn(CompletableFuture.failedFuture(new
FencedStateEpochException("Fenced state epoch")));
future = sharePartitionManager.fetchMessages(groupId, memberId,
FETCH_PARAMS, BATCH_OPTIMIZED, 0,
MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
TestUtils.waitForCondition(
@@ -2425,7 +2424,7 @@ public class SharePartitionManagerTest {
// The last exception removes the share partition from the cache hence
re-add the share partition to cache.
partitionCache.put(new SharePartitionKey(groupId, tp0), sp0);
// Return NotLeaderOrFollowerException to simulate initialization
failure.
- when(sp0.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new
NotLeaderOrFollowerException("Not leader or follower")));
+
when(sp0.maybeInitialize()).thenReturn(CompletableFuture.failedFuture(new
NotLeaderOrFollowerException("Not leader or follower")));
future = sharePartitionManager.fetchMessages(groupId, memberId,
FETCH_PARAMS, BATCH_OPTIMIZED, 0,
MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
TestUtils.waitForCondition(
@@ -2439,7 +2438,7 @@ public class SharePartitionManagerTest {
// The last exception removes the share partition from the cache hence
re-add the share partition to cache.
partitionCache.put(new SharePartitionKey(groupId, tp0), sp0);
// Return RuntimeException to simulate initialization failure.
- when(sp0.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new
RuntimeException("Runtime exception")));
+
when(sp0.maybeInitialize()).thenReturn(CompletableFuture.failedFuture(new
RuntimeException("Runtime exception")));
future = sharePartitionManager.fetchMessages(groupId, memberId,
FETCH_PARAMS, BATCH_OPTIMIZED, 0,
MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
TestUtils.waitForCondition(
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index db21a6df814..00c080f695b 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -71,7 +71,6 @@ import org.apache.kafka.server.share.persister.TopicData;
import org.apache.kafka.server.share.persister.WriteShareGroupStateResult;
import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.server.storage.log.FetchPartitionData;
-import org.apache.kafka.server.util.FutureUtils;
import org.apache.kafka.server.util.timer.MockTimer;
import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.server.util.timer.TimerTask;
@@ -909,7 +908,7 @@ public class SharePartitionTest {
public void testMaybeInitializeWithReadException() {
Persister persister = Mockito.mock(Persister.class);
// Complete the future exceptionally for read state.
-
Mockito.when(persister.readState(Mockito.any())).thenReturn(FutureUtils.failedFuture(new
RuntimeException("Read exception")));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.failedFuture(new
RuntimeException("Read exception")));
SharePartition sharePartition1 =
SharePartitionBuilder.builder().withPersister(persister).build();
CompletableFuture<Void> result = sharePartition1.maybeInitialize();
@@ -7514,7 +7513,7 @@ public class SharePartitionTest {
mockPersisterReadStateMethod(persister);
SharePartition sharePartition1 =
SharePartitionBuilder.builder().withPersister(persister).build();
-
Mockito.when(persister.writeState(Mockito.any())).thenReturn(FutureUtils.failedFuture(new
RuntimeException("Write exception")));
+
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.failedFuture(new
RuntimeException("Write exception")));
CompletableFuture<Void> writeResult =
sharePartition1.writeShareGroupState(anyList());
assertTrue(writeResult.isCompletedExceptionally());
assertFutureThrows(IllegalStateException.class, writeResult);
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index 23bb4cc413b..d3e3d548029 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -60,7 +60,6 @@ import org.apache.kafka.server.authorizer.{Action,
AuthorizableRequestContext, A
import org.apache.kafka.server.common.{ApiMessageAndVersion,
FinalizedFeatures, KRaftVersion, MetadataVersion, ProducerIdsBlock,
RequestLocal}
import org.apache.kafka.server.config.ServerConfigs
import org.apache.kafka.server.quota.{ClientQuotaManager,
ControllerMutationQuota, ControllerMutationQuotaManager}
-import org.apache.kafka.server.util.FutureUtils
import org.apache.kafka.storage.internals.log.CleanerConfig
import org.apache.kafka.test.TestUtils
import org.junit.jupiter.api.Assertions._
@@ -1230,7 +1229,7 @@ class ControllerApisTest {
)))))
.thenReturn(Collections.singletonList(AuthorizationResult.ALLOWED))
when(controller.assignReplicasToDirs(any[ControllerRequestContext],
ArgumentMatchers.eq(request.data)))
-
.thenReturn(FutureUtils.failedFuture[AssignReplicasToDirsResponseData](Errors.UNKNOWN_TOPIC_OR_PARTITION.exception()))
+
.thenReturn(CompletableFuture.failedFuture[AssignReplicasToDirsResponseData](Errors.UNKNOWN_TOPIC_OR_PARTITION.exception()))
val response = handleRequest[AssignReplicasToDirsResponse](request,
controllerApis)
assertEquals(new
AssignReplicasToDirsResponseData().setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()),
response.data)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index ca05fbb1b69..ad8f3d47916 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -101,7 +101,7 @@ import
org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch
import org.apache.kafka.server.share.context.{FinalContext,
ShareSessionContext}
import org.apache.kafka.server.share.session.{ShareSession, ShareSessionKey}
import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData}
-import org.apache.kafka.server.util.{FutureUtils, MockTime}
+import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig,
UnifiedLog}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions._
@@ -3447,7 +3447,7 @@ class KafkaApisTest extends Logging {
ArgumentMatchers.eq(0),
ArgumentMatchers.eq(TransactionResult.COMMIT),
ArgumentMatchers.eq(TransactionVersion.TV_2.featureLevel())
- )).thenReturn(FutureUtils.failedFuture[Void](error.exception()))
+ )).thenReturn(CompletableFuture.failedFuture[Void](error.exception()))
kafkaApis = createKafkaApis()
kafkaApis.handleWriteTxnMarkersRequest(requestChannelRequest,
RequestLocal.noCaching)
@@ -5003,7 +5003,7 @@ class KafkaApisTest extends Logging {
val memberId: String = Uuid.randomUuid().toString
when(sharePartitionManager.fetchMessages(any(), any(), any(), any(),
anyInt(), anyInt(), anyInt(), any())).thenReturn(
- FutureUtils.failedFuture[util.Map[TopicIdPartition,
ShareFetchResponseData.PartitionData]](Errors.UNKNOWN_SERVER_ERROR.exception())
+ CompletableFuture.failedFuture[util.Map[TopicIdPartition,
ShareFetchResponseData.PartitionData]](Errors.UNKNOWN_SERVER_ERROR.exception())
)
when(sharePartitionManager.newContext(any(), any(), any(), any(), any(),
any(), any())).thenReturn(
@@ -5064,7 +5064,7 @@ class KafkaApisTest extends Logging {
)
when(sharePartitionManager.acknowledge(any(), any(), any())).thenReturn(
- FutureUtils.failedFuture[util.Map[TopicIdPartition,
ShareAcknowledgeResponseData.PartitionData]](Errors.UNKNOWN_SERVER_ERROR.exception())
+ CompletableFuture.failedFuture[util.Map[TopicIdPartition,
ShareAcknowledgeResponseData.PartitionData]](Errors.UNKNOWN_SERVER_ERROR.exception())
)
val cachedSharePartitions = new
ImplicitLinkedHashCollection[CachedSharePartition]
@@ -5120,11 +5120,11 @@ class KafkaApisTest extends Logging {
val groupId = "group"
when(sharePartitionManager.fetchMessages(any(), any(), any(), any(),
anyInt(), anyInt(), anyInt(), any())).thenReturn(
- FutureUtils.failedFuture[util.Map[TopicIdPartition,
ShareFetchResponseData.PartitionData]](Errors.UNKNOWN_SERVER_ERROR.exception())
+ CompletableFuture.failedFuture[util.Map[TopicIdPartition,
ShareFetchResponseData.PartitionData]](Errors.UNKNOWN_SERVER_ERROR.exception())
)
when(sharePartitionManager.acknowledge(any(), any(), any())).thenReturn(
- FutureUtils.failedFuture[util.Map[TopicIdPartition,
ShareAcknowledgeResponseData.PartitionData]](Errors.UNKNOWN_SERVER_ERROR.exception())
+ CompletableFuture.failedFuture[util.Map[TopicIdPartition,
ShareAcknowledgeResponseData.PartitionData]](Errors.UNKNOWN_SERVER_ERROR.exception())
)
val cachedSharePartitions = new
ImplicitLinkedHashCollection[CachedSharePartition]
@@ -7143,7 +7143,7 @@ class KafkaApisTest extends Logging {
)
when(sharePartitionManager.releaseSession(any(), any())).thenReturn(
- FutureUtils.failedFuture[util.Map[TopicIdPartition,
ShareAcknowledgeResponseData.PartitionData]](Errors.UNKNOWN_SERVER_ERROR.exception())
+ CompletableFuture.failedFuture[util.Map[TopicIdPartition,
ShareAcknowledgeResponseData.PartitionData]](Errors.UNKNOWN_SERVER_ERROR.exception())
)
when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
@@ -7576,7 +7576,7 @@ class KafkaApisTest extends Logging {
any[Session](), anyString, anyDouble, anyLong)).thenReturn(0)
when(sharePartitionManager.acknowledge(any(), any(), any())).thenReturn(
- FutureUtils.failedFuture[util.Map[TopicIdPartition,
ShareAcknowledgeResponseData.PartitionData]](Errors.UNKNOWN_SERVER_ERROR.exception())
+ CompletableFuture.failedFuture[util.Map[TopicIdPartition,
ShareAcknowledgeResponseData.PartitionData]](Errors.UNKNOWN_SERVER_ERROR.exception())
)
doNothing().when(sharePartitionManager).acknowledgeSessionUpdate(any(),
any(), any())
@@ -7704,7 +7704,7 @@ class KafkaApisTest extends Logging {
doNothing().when(sharePartitionManager).acknowledgeSessionUpdate(any(),
any(), any())
when(sharePartitionManager.releaseSession(any(), any())).thenReturn(
- FutureUtils.failedFuture[util.Map[TopicIdPartition,
ShareAcknowledgeResponseData.PartitionData]](Errors.UNKNOWN_SERVER_ERROR.exception())
+ CompletableFuture.failedFuture[util.Map[TopicIdPartition,
ShareAcknowledgeResponseData.PartitionData]](Errors.UNKNOWN_SERVER_ERROR.exception())
)
val shareAcknowledgeRequestData = new ShareAcknowledgeRequestData().
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index c3a8e0e4798..7e242d26ca6 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -2178,11 +2178,11 @@ public class GroupCoordinatorService implements
GroupCoordinator {
short transactionVersion
) {
if (!isActive.get()) {
- return
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
+ return
CompletableFuture.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
}
if (!tp.topic().equals(Topic.GROUP_METADATA_TOPIC_NAME)) {
- return FutureUtils.failedFuture(new IllegalStateException(
+ return CompletableFuture.failedFuture(new IllegalStateException(
"Completing a transaction for " + tp + " is not expected"
));
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index 11b0a4dde04..33e5904143a 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -113,7 +113,6 @@ import org.apache.kafka.server.share.persister.Persister;
import
org.apache.kafka.server.share.persister.ReadShareGroupStateSummaryParameters;
import
org.apache.kafka.server.share.persister.ReadShareGroupStateSummaryResult;
import org.apache.kafka.server.share.persister.TopicData;
-import org.apache.kafka.server.util.FutureUtils;
import org.apache.kafka.server.util.timer.MockTimer;
import org.junit.jupiter.api.Test;
@@ -282,7 +281,7 @@ public class GroupCoordinatorServiceTest {
ArgumentMatchers.eq("consumer-group-heartbeat"),
ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any()
- )).thenReturn(FutureUtils.failedFuture(exception));
+ )).thenReturn(CompletableFuture.failedFuture(exception));
CompletableFuture<ConsumerGroupHeartbeatResponseData> future =
service.consumerGroupHeartbeat(
requestContext(ApiKeys.CONSUMER_GROUP_HEARTBEAT),
@@ -559,7 +558,7 @@ public class GroupCoordinatorServiceTest {
ArgumentMatchers.eq("streams-group-heartbeat"),
ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any()
- )).thenReturn(FutureUtils.failedFuture(exception));
+ )).thenReturn(CompletableFuture.failedFuture(exception));
CompletableFuture<StreamsGroupHeartbeatResult> future =
service.streamsGroupHeartbeat(
requestContext(ApiKeys.STREAMS_GROUP_HEARTBEAT),
@@ -1077,7 +1076,7 @@ public class GroupCoordinatorServiceTest {
ArgumentMatchers.eq("classic-group-join"),
ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any()
- )).thenReturn(FutureUtils.failedFuture(new IllegalStateException()));
+ )).thenReturn(CompletableFuture.failedFuture(new
IllegalStateException()));
CompletableFuture<JoinGroupResponseData> future = service.joinGroup(
requestContext(ApiKeys.JOIN_GROUP),
@@ -1229,7 +1228,7 @@ public class GroupCoordinatorServiceTest {
ArgumentMatchers.eq("classic-group-sync"),
ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any()
- )).thenReturn(FutureUtils.failedFuture(new IllegalStateException()));
+ )).thenReturn(CompletableFuture.failedFuture(new
IllegalStateException()));
CompletableFuture<SyncGroupResponseData> future = service.syncGroup(
requestContext(ApiKeys.SYNC_GROUP),
@@ -1337,7 +1336,7 @@ public class GroupCoordinatorServiceTest {
ArgumentMatchers.eq("classic-group-heartbeat"),
ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any()
- )).thenReturn(FutureUtils.failedFuture(
+ )).thenReturn(CompletableFuture.failedFuture(
new CoordinatorLoadInProgressException(null)
));
@@ -1365,7 +1364,7 @@ public class GroupCoordinatorServiceTest {
ArgumentMatchers.eq("classic-group-heartbeat"),
ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any()
- )).thenReturn(FutureUtils.failedFuture(
+ )).thenReturn(CompletableFuture.failedFuture(
new RebalanceInProgressException()
));
@@ -1477,7 +1476,7 @@ public class GroupCoordinatorServiceTest {
)).thenReturn(Arrays.asList(
CompletableFuture.completedFuture(List.of(expectedResults.get(0))),
CompletableFuture.completedFuture(List.of(expectedResults.get(1))),
- FutureUtils.failedFuture(new NotCoordinatorException(""))
+ CompletableFuture.failedFuture(new NotCoordinatorException(""))
));
CompletableFuture<ListGroupsResponseData> responseFuture =
service.listGroups(
@@ -1504,7 +1503,7 @@ public class GroupCoordinatorServiceTest {
)).thenReturn(Arrays.asList(
CompletableFuture.completedFuture(List.of()),
CompletableFuture.completedFuture(List.of()),
- FutureUtils.failedFuture(new
CoordinatorLoadInProgressException(""))
+ CompletableFuture.failedFuture(new
CoordinatorLoadInProgressException(""))
));
CompletableFuture<ListGroupsResponseData> responseFuture =
service.listGroups(
@@ -1652,7 +1651,7 @@ public class GroupCoordinatorServiceTest {
ArgumentMatchers.eq("describe-groups"),
ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any()
- )).thenReturn(FutureUtils.failedFuture(
+ )).thenReturn(CompletableFuture.failedFuture(
new CoordinatorLoadInProgressException(null)
));
@@ -1837,7 +1836,7 @@ public class GroupCoordinatorServiceTest {
ArgumentMatchers.eq(fetchAllOffsets ? "fetch-all-offsets" :
"fetch-offsets"),
ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any()
- )).thenReturn(FutureUtils.failedFuture(new
CompletionException(error.exception())));
+ )).thenReturn(CompletableFuture.failedFuture(new
CompletionException(error.exception())));
CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup>
future = service.fetchOffsets(
requestContext(ApiKeys.OFFSET_FETCH),
@@ -1904,7 +1903,7 @@ public class GroupCoordinatorServiceTest {
ArgumentMatchers.eq("classic-group-leave"),
ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any()
- )).thenReturn(FutureUtils.failedFuture(
+ )).thenReturn(CompletableFuture.failedFuture(
new UnknownMemberIdException()
));
@@ -2049,7 +2048,7 @@ public class GroupCoordinatorServiceTest {
ArgumentMatchers.eq("consumer-group-describe"),
ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any()
- )).thenReturn(FutureUtils.failedFuture(
+ )).thenReturn(CompletableFuture.failedFuture(
new CoordinatorLoadInProgressException(null)
));
@@ -2076,7 +2075,7 @@ public class GroupCoordinatorServiceTest {
ArgumentMatchers.eq("consumer-group-describe"),
ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any()
- )).thenReturn(FutureUtils.failedFuture(
+ )).thenReturn(CompletableFuture.failedFuture(
Errors.COORDINATOR_NOT_AVAILABLE.exception()
));
@@ -2187,7 +2186,7 @@ public class GroupCoordinatorServiceTest {
ArgumentMatchers.eq("streams-group-describe"),
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
ArgumentMatchers.any()
- )).thenReturn(FutureUtils.failedFuture(
+ )).thenReturn(CompletableFuture.failedFuture(
new CoordinatorLoadInProgressException(null)
));
@@ -2214,7 +2213,7 @@ public class GroupCoordinatorServiceTest {
ArgumentMatchers.eq("streams-group-describe"),
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
ArgumentMatchers.any()
- )).thenReturn(FutureUtils.failedFuture(
+ )).thenReturn(CompletableFuture.failedFuture(
Errors.COORDINATOR_NOT_AVAILABLE.exception()
));
@@ -2349,7 +2348,7 @@ public class GroupCoordinatorServiceTest {
ArgumentMatchers.eq("delete-offsets"),
ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any()
- )).thenReturn(FutureUtils.failedFuture(exception));
+ )).thenReturn(CompletableFuture.failedFuture(exception));
CompletableFuture<OffsetDeleteResponseData> future =
service.deleteOffsets(
requestContext(ApiKeys.OFFSET_DELETE),
@@ -2443,7 +2442,7 @@ public class GroupCoordinatorServiceTest {
ArgumentMatchers.eq("delete-groups"),
ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1)),
ArgumentMatchers.any()
-
)).thenReturn(FutureUtils.failedFuture(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception()));
+
)).thenReturn(CompletableFuture.failedFuture(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception()));
List<String> groupIds = Arrays.asList("group-id-1", "group-id-2",
"group-id-3", null);
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>
future =
@@ -2819,7 +2818,7 @@ public class GroupCoordinatorServiceTest {
ArgumentMatchers.eq("delete-groups"),
ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any()
- )).thenReturn(FutureUtils.failedFuture(exception));
+ )).thenReturn(CompletableFuture.failedFuture(exception));
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>
future =
service.deleteGroups(
@@ -3029,7 +3028,7 @@ public class GroupCoordinatorServiceTest {
ArgumentMatchers.eq((short) 5),
ArgumentMatchers.any(),
ArgumentMatchers.eq((int)
ApiKeys.TXN_OFFSET_COMMIT.latestVersion())
- )).thenReturn(FutureUtils.failedFuture(new
CompletionException(error.exception())));
+ )).thenReturn(CompletableFuture.failedFuture(new
CompletionException(error.exception())));
CompletableFuture<TxnOffsetCommitResponseData> future =
service.commitTransactionalOffsets(
requestContext(ApiKeys.TXN_OFFSET_COMMIT),
@@ -3244,7 +3243,7 @@ public class GroupCoordinatorServiceTest {
)).thenReturn(Arrays.asList(
CompletableFuture.completedFuture(null),
CompletableFuture.completedFuture(null),
-
FutureUtils.failedFuture(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception())
+
CompletableFuture.failedFuture(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception())
));
when(runtime.scheduleWriteAllOperation(
@@ -3253,7 +3252,7 @@ public class GroupCoordinatorServiceTest {
)).thenReturn(Arrays.asList(
CompletableFuture.completedFuture(null),
CompletableFuture.completedFuture(null),
-
FutureUtils.failedFuture(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception())
+
CompletableFuture.failedFuture(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception())
));
// Verify no exception thrown.
@@ -3345,7 +3344,7 @@ public class GroupCoordinatorServiceTest {
ArgumentMatchers.eq("share-group-heartbeat"),
ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any()
- )).thenReturn(FutureUtils.failedFuture(exception));
+ )).thenReturn(CompletableFuture.failedFuture(exception));
CompletableFuture<ShareGroupHeartbeatResponseData> future =
service.shareGroupHeartbeat(
requestContext(ApiKeys.SHARE_GROUP_HEARTBEAT),
@@ -3563,7 +3562,7 @@ public class GroupCoordinatorServiceTest {
ArgumentMatchers.eq("share-group-describe"),
ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any()
- )).thenReturn(FutureUtils.failedFuture(
+ )).thenReturn(CompletableFuture.failedFuture(
new CoordinatorLoadInProgressException(null)
));
@@ -3591,7 +3590,7 @@ public class GroupCoordinatorServiceTest {
ArgumentMatchers.eq("share-group-describe"),
ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any()
- )).thenReturn(FutureUtils.failedFuture(
+ )).thenReturn(CompletableFuture.failedFuture(
Errors.COORDINATOR_NOT_AVAILABLE.exception()
));
@@ -4754,7 +4753,7 @@ public class GroupCoordinatorServiceTest {
ArgumentMatchers.eq("initiate-delete-share-group-offsets"),
ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any()
-
)).thenReturn(FutureUtils.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception()));
+
)).thenReturn(CompletableFuture.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception()));
CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
requestData);
@@ -5321,7 +5320,7 @@ public class GroupCoordinatorServiceTest {
ArgumentMatchers.eq("complete-delete-share-group-offsets"),
ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any()
-
)).thenReturn(FutureUtils.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception()));
+
)).thenReturn(CompletableFuture.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception()));
CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
requestData);
@@ -5820,7 +5819,7 @@ public class GroupCoordinatorServiceTest {
ArgumentMatchers.eq("share-group-offsets-alter"),
ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any()
- )).thenReturn(FutureUtils.failedFuture(
+ )).thenReturn(CompletableFuture.failedFuture(
new GroupNotEmptyException("bad stuff")
));
diff --git
a/server-common/src/main/java/org/apache/kafka/server/util/FutureUtils.java
b/server-common/src/main/java/org/apache/kafka/server/util/FutureUtils.java
index ddef8d59efe..ea6e6efc0f7 100644
--- a/server-common/src/main/java/org/apache/kafka/server/util/FutureUtils.java
+++ b/server-common/src/main/java/org/apache/kafka/server/util/FutureUtils.java
@@ -93,18 +93,6 @@ public class FutureUtils {
});
}
- /**
- * Returns a new CompletableFuture that is already completed exceptionally
with the given exception.
- *
- * @param ex The exception.
- * @return The exceptionally completed CompletableFuture.
- */
- public static <T> CompletableFuture<T> failedFuture(Throwable ex) {
- CompletableFuture<T> future = new CompletableFuture<>();
- future.completeExceptionally(ex);
- return future;
- }
-
/**
* Given a list of CompletableFutures returns a single CompletableFuture
combining them.
*
diff --git
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
index 66797199b0b..6c22e91bcf1 100644
---
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
+++
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
@@ -51,7 +51,6 @@ import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.server.common.ShareVersion;
import org.apache.kafka.server.share.SharePartitionKey;
-import org.apache.kafka.server.util.FutureUtils;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.server.util.timer.MockTimer;
import org.apache.kafka.server.util.timer.Timer;
@@ -1178,7 +1177,7 @@ class ShareCoordinatorServiceTest {
int partition = 0;
when(runtime.scheduleWriteOperation(any(), any(), any()))
-
.thenReturn(FutureUtils.failedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception()));
+
.thenReturn(CompletableFuture.failedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception()));
assertEquals(new WriteShareGroupStateResponseData()
.setResults(List.of(new
WriteShareGroupStateResponseData.WriteStateResult()
@@ -1230,7 +1229,7 @@ class ShareCoordinatorServiceTest {
int partition = 0;
when(runtime.scheduleWriteOperation(any(), any(), any()))
-
.thenReturn(FutureUtils.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception()));
+
.thenReturn(CompletableFuture.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception()));
assertEquals(new ReadShareGroupStateResponseData()
.setResults(List.of(new
ReadShareGroupStateResponseData.ReadStateResult()
@@ -1274,7 +1273,7 @@ class ShareCoordinatorServiceTest {
int partition = 0;
when(runtime.scheduleWriteOperation(any(), any(), any()))
-
.thenReturn(FutureUtils.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception()));
+
.thenReturn(CompletableFuture.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception()));
assertEquals(new ReadShareGroupStateSummaryResponseData()
.setResults(List.of(new
ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
@@ -1318,7 +1317,7 @@ class ShareCoordinatorServiceTest {
int partition = 0;
when(runtime.scheduleWriteOperation(any(), any(), any()))
-
.thenReturn(FutureUtils.failedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception()));
+
.thenReturn(CompletableFuture.failedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception()));
assertEquals(new DeleteShareGroupStateResponseData()
.setResults(List.of(new
DeleteShareGroupStateResponseData.DeleteStateResult()
@@ -1360,7 +1359,7 @@ class ShareCoordinatorServiceTest {
Uuid topicId = Uuid.randomUuid();
int partition = 0;
- when(runtime.scheduleWriteOperation(any(), any(),
any())).thenReturn(FutureUtils.failedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception()));
+ when(runtime.scheduleWriteOperation(any(), any(),
any())).thenReturn(CompletableFuture.failedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception()));
assertEquals(
new InitializeShareGroupStateResponseData().setResults(List.of(new
InitializeShareGroupStateResponseData.InitializeStateResult()
@@ -2287,7 +2286,7 @@ class ShareCoordinatorServiceTest {
List.of(
CompletableFuture.completedFuture(null),
CompletableFuture.completedFuture(null),
-
FutureUtils.failedFuture(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception())
+
CompletableFuture.failedFuture(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception())
)
);