This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1834030107b KAFKA-17510: Exception handling and purgatory completion
on initialization delay (#17709)
1834030107b is described below
commit 1834030107b93fe2c08fcb48451f14b0d764051d
Author: Apoorv Mittal <[email protected]>
AuthorDate: Thu Nov 14 15:44:42 2024 +0000
KAFKA-17510: Exception handling and purgatory completion on initialization
delay (#17709)
Reviewers: Jun Rao <[email protected]>
---
.../java/kafka/server/share/DelayedShareFetch.java | 87 ++++++---
.../java/kafka/server/share/ShareFetchUtils.java | 12 +-
.../java/kafka/server/share/SharePartition.java | 4 +-
.../kafka/server/share/SharePartitionManager.java | 136 +++++---------
.../kafka/server/share/DelayedShareFetchTest.java | 107 ++++++-----
.../kafka/server/share/ShareFetchUtilsTest.java | 26 +--
.../server/share/SharePartitionManagerTest.java | 128 +++++++++++--
.../server/ShareFetchAcknowledgeRequestTest.scala | 78 ++++++--
.../kafka/server/share/fetch/ShareFetch.java | 202 +++++++++++++++++++++
.../kafka/server/share/fetch/ShareFetchData.java | 78 --------
.../kafka/server/share/fetch/ShareFetchTest.java | 90 +++++++++
11 files changed, 658 insertions(+), 290 deletions(-)
diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java
b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
index 4cb9ce0cf42..a5cd79ee358 100644
--- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java
+++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
@@ -25,8 +25,9 @@ import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.server.purgatory.DelayedOperation;
+import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
-import org.apache.kafka.server.share.fetch.ShareFetchData;
+import org.apache.kafka.server.share.fetch.ShareFetch;
import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.server.storage.log.FetchPartitionData;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
@@ -55,7 +56,7 @@ public class DelayedShareFetch extends DelayedOperation {
private static final Logger log =
LoggerFactory.getLogger(DelayedShareFetch.class);
- private final ShareFetchData shareFetchData;
+ private final ShareFetch shareFetch;
private final ReplicaManager replicaManager;
private Map<TopicIdPartition, FetchRequest.PartitionData>
partitionsAcquired;
@@ -66,12 +67,12 @@ public class DelayedShareFetch extends DelayedOperation {
private final LinkedHashMap<TopicIdPartition, SharePartition>
sharePartitions;
DelayedShareFetch(
- ShareFetchData shareFetchData,
+ ShareFetch shareFetch,
ReplicaManager replicaManager,
SharePartitionManager sharePartitionManager,
LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions) {
- super(shareFetchData.fetchParams().maxWaitMs, Optional.empty());
- this.shareFetchData = shareFetchData;
+ super(shareFetch.fetchParams().maxWaitMs, Optional.empty());
+ this.shareFetch = shareFetch;
this.replicaManager = replicaManager;
this.partitionsAcquired = new LinkedHashMap<>();
this.partitionsAlreadyFetched = new LinkedHashMap<>();
@@ -91,10 +92,10 @@ public class DelayedShareFetch extends DelayedOperation {
@Override
public void onComplete() {
log.trace("Completing the delayed share fetch request for group {},
member {}, "
- + "topic partitions {}", shareFetchData.groupId(),
shareFetchData.memberId(),
+ + "topic partitions {}", shareFetch.groupId(),
shareFetch.memberId(),
partitionsAcquired.keySet());
- if (shareFetchData.future().isDone())
+ if (shareFetch.isCompleted())
return;
Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData;
@@ -107,11 +108,11 @@ public class DelayedShareFetch extends DelayedOperation {
if (topicPartitionData.isEmpty()) {
// No locks for share partitions could be acquired, so we complete
the request with an empty response.
- shareFetchData.future().complete(Collections.emptyMap());
+ shareFetch.maybeComplete(Collections.emptyMap());
return;
}
log.trace("Fetchable share partitions data: {} with groupId: {} fetch
params: {}",
- topicPartitionData, shareFetchData.groupId(),
shareFetchData.fetchParams());
+ topicPartitionData, shareFetch.groupId(),
shareFetch.fetchParams());
try {
Map<TopicIdPartition, LogReadResult> responseData;
@@ -126,11 +127,11 @@ public class DelayedShareFetch extends DelayedOperation {
for (Map.Entry<TopicIdPartition, LogReadResult> entry :
responseData.entrySet())
fetchPartitionsData.put(entry.getKey(),
entry.getValue().toFetchPartitionData(false));
-
shareFetchData.future().complete(ShareFetchUtils.processFetchResponse(shareFetchData,
fetchPartitionsData,
+
shareFetch.maybeComplete(ShareFetchUtils.processFetchResponse(shareFetch,
fetchPartitionsData,
sharePartitions, replicaManager));
} catch (Exception e) {
log.error("Error processing delayed share fetch request", e);
-
sharePartitionManager.handleFetchException(shareFetchData.groupId(),
topicPartitionData.keySet(), shareFetchData.future(), e);
+ handleFetchException(shareFetch, topicPartitionData.keySet(), e);
} finally {
// Releasing the lock to move ahead with the next request in queue.
releasePartitionLocks(topicPartitionData.keySet());
@@ -140,7 +141,7 @@ public class DelayedShareFetch extends DelayedOperation {
// we directly call delayedShareFetchPurgatory.checkAndComplete
replicaManager.addToActionQueue(() ->
topicPartitionData.keySet().forEach(topicIdPartition ->
replicaManager.completeDelayedShareFetchRequest(
- new DelayedShareFetchGroupKey(shareFetchData.groupId(),
topicIdPartition.topicId(), topicIdPartition.partition()))));
+ new DelayedShareFetchGroupKey(shareFetch.groupId(),
topicIdPartition.topicId(), topicIdPartition.partition()))));
}
}
@@ -170,13 +171,13 @@ public class DelayedShareFetch extends DelayedOperation {
return completedByMe;
} else {
log.debug("minBytes is not satisfied for the share fetch
request for group {}, member {}, " +
- "topic partitions {}", shareFetchData.groupId(),
shareFetchData.memberId(),
+ "topic partitions {}", shareFetch.groupId(),
shareFetch.memberId(),
sharePartitions.keySet());
releasePartitionLocks(topicPartitionData.keySet());
}
} else {
log.trace("Can't acquire records for any partition in the
share fetch request for group {}, member {}, " +
- "topic partitions {}", shareFetchData.groupId(),
shareFetchData.memberId(),
+ "topic partitions {}", shareFetch.groupId(),
shareFetch.memberId(),
sharePartitions.keySet());
}
return false;
@@ -198,7 +199,7 @@ public class DelayedShareFetch extends DelayedOperation {
Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData =
new LinkedHashMap<>();
sharePartitions.forEach((topicIdPartition, sharePartition) -> {
- int partitionMaxBytes =
shareFetchData.partitionMaxBytes().getOrDefault(topicIdPartition, 0);
+ int partitionMaxBytes =
shareFetch.partitionMaxBytes().getOrDefault(topicIdPartition, 0);
// Add the share partition to the list of partitions to be fetched
only if we can
// acquire the fetch lock on it.
if (sharePartition.maybeAcquireFetchLock()) {
@@ -266,7 +267,16 @@ public class DelayedShareFetch extends DelayedOperation {
for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry :
topicPartitionData.entrySet()) {
TopicIdPartition topicIdPartition = entry.getKey();
FetchRequest.PartitionData partitionData = entry.getValue();
- LogOffsetMetadata endOffsetMetadata =
endOffsetMetadataForTopicPartition(topicIdPartition);
+
+ LogOffsetMetadata endOffsetMetadata;
+ try {
+ endOffsetMetadata =
endOffsetMetadataForTopicPartition(topicIdPartition);
+ } catch (Exception e) {
+ shareFetch.addErroneous(topicIdPartition, e);
+ sharePartitionManager.handleFencedSharePartitionException(
+ new SharePartitionKey(shareFetch.groupId(),
topicIdPartition), e);
+ continue;
+ }
if (endOffsetMetadata == LogOffsetMetadata.UNKNOWN_OFFSET_METADATA)
continue;
@@ -280,14 +290,14 @@ public class DelayedShareFetch extends DelayedOperation {
if (fetchOffsetMetadata.messageOffset >
endOffsetMetadata.messageOffset) {
log.debug("Satisfying delayed share fetch request for group
{}, member {} since it is fetching later segments of " +
- "topicIdPartition {}", shareFetchData.groupId(),
shareFetchData.memberId(), topicIdPartition);
+ "topicIdPartition {}", shareFetch.groupId(),
shareFetch.memberId(), topicIdPartition);
return true;
} else if (fetchOffsetMetadata.messageOffset <
endOffsetMetadata.messageOffset) {
if (fetchOffsetMetadata.onOlderSegment(endOffsetMetadata)) {
// This can happen when the fetch operation is falling
behind the current segment or the partition
// has just rolled a new segment.
log.debug("Satisfying delayed share fetch request for
group {}, member {} immediately since it is fetching older " +
- "segments of topicIdPartition {}",
shareFetchData.groupId(), shareFetchData.memberId(), topicIdPartition);
+ "segments of topicIdPartition {}",
shareFetch.groupId(), shareFetch.memberId(), topicIdPartition);
return true;
} else if
(fetchOffsetMetadata.onSameSegment(endOffsetMetadata)) {
// we take the partition fetch size as upper bound when
accumulating the bytes.
@@ -296,15 +306,15 @@ public class DelayedShareFetch extends DelayedOperation {
}
}
}
- return accumulatedSize >= shareFetchData.fetchParams().minBytes;
+ return accumulatedSize >= shareFetch.fetchParams().minBytes;
}
private LogOffsetMetadata
endOffsetMetadataForTopicPartition(TopicIdPartition topicIdPartition) {
- Partition partition =
replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
+ Partition partition = ShareFetchUtils.partition(replicaManager,
topicIdPartition.topicPartition());
LogOffsetSnapshot offsetSnapshot =
partition.fetchOffsetSnapshot(Optional.empty(), true);
// The FetchIsolation type that we use for share fetch is
FetchIsolation.HIGH_WATERMARK. In the future, we can
// extend it to support other FetchIsolation types.
- FetchIsolation isolationType = shareFetchData.fetchParams().isolation;
+ FetchIsolation isolationType = shareFetch.fetchParams().isolation;
if (isolationType == FetchIsolation.LOG_END)
return offsetSnapshot.logEndOffset;
else if (isolationType == FetchIsolation.HIGH_WATERMARK)
@@ -315,11 +325,17 @@ public class DelayedShareFetch extends DelayedOperation {
}
private Map<TopicIdPartition, LogReadResult>
readFromLog(Map<TopicIdPartition, FetchRequest.PartitionData>
topicPartitionData) {
+ // Filter if there already exists any erroneous topic partition.
+ Set<TopicIdPartition> partitionsToFetch =
shareFetch.filterErroneousTopicPartitions(topicPartitionData.keySet());
+ if (partitionsToFetch.isEmpty()) {
+ return Collections.emptyMap();
+ }
+
Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult =
replicaManager.readFromLog(
- shareFetchData.fetchParams(),
+ shareFetch.fetchParams(),
CollectionConverters.asScala(
- topicPartitionData.entrySet().stream().map(entry ->
- new Tuple2<>(entry.getKey(),
entry.getValue())).collect(Collectors.toList())
+ partitionsToFetch.stream().map(topicIdPartition ->
+ new Tuple2<>(topicIdPartition,
topicPartitionData.get(topicIdPartition))).collect(Collectors.toList())
),
QuotaFactory.UNBOUNDED_QUOTA,
true);
@@ -339,6 +355,29 @@ public class DelayedShareFetch extends DelayedOperation {
.anyMatch(logReadResult -> logReadResult.error().code() !=
Errors.NONE.code());
}
+ /**
+ * The handleFetchException method is used to handle the exception that
occurred while reading from log.
+ * The method will handle the exception for each topic-partition in the
request. The share partition
+ * might get removed from the cache.
+ * <p>
+ * The replica read request might error out for one share partition
+ * but as we cannot determine which share partition errored out, we might
remove all the share partitions
+ * in the request.
+ *
+ * @param shareFetch The share fetch request.
+ * @param topicIdPartitions The topic-partitions in the replica read
request.
+ * @param throwable The exception that occurred while fetching messages.
+ */
+ private void handleFetchException(
+ ShareFetch shareFetch,
+ Set<TopicIdPartition> topicIdPartitions,
+ Throwable throwable
+ ) {
+ topicIdPartitions.forEach(topicIdPartition ->
sharePartitionManager.handleFencedSharePartitionException(
+ new SharePartitionKey(shareFetch.groupId(), topicIdPartition),
throwable));
+ shareFetch.maybeCompleteWithException(topicIdPartitions, throwable);
+ }
+
// Visible for testing.
Map<TopicIdPartition, LogReadResult>
combineLogReadResponse(Map<TopicIdPartition, FetchRequest.PartitionData>
topicPartitionData,
Map<TopicIdPartition, LogReadResult> existingFetchedData) {
diff --git a/core/src/main/java/kafka/server/share/ShareFetchUtils.java
b/core/src/main/java/kafka/server/share/ShareFetchUtils.java
index 3515362152b..e3608128eb5 100644
--- a/core/src/main/java/kafka/server/share/ShareFetchUtils.java
+++ b/core/src/main/java/kafka/server/share/ShareFetchUtils.java
@@ -29,7 +29,7 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
-import org.apache.kafka.server.share.fetch.ShareFetchData;
+import org.apache.kafka.server.share.fetch.ShareFetch;
import org.apache.kafka.server.storage.log.FetchPartitionData;
import org.slf4j.Logger;
@@ -55,7 +55,7 @@ public class ShareFetchUtils {
* by acquiring records from the share partition.
*/
static Map<TopicIdPartition, ShareFetchResponseData.PartitionData>
processFetchResponse(
- ShareFetchData shareFetchData,
+ ShareFetch shareFetch,
Map<TopicIdPartition, FetchPartitionData> responseData,
LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions,
ReplicaManager replicaManager
@@ -91,7 +91,7 @@ public class ShareFetchUtils {
partitionData.setErrorMessage(Errors.NONE.message());
}
} else {
- ShareAcquiredRecords shareAcquiredRecords =
sharePartition.acquire(shareFetchData.memberId(),
shareFetchData.maxFetchRecords() - acquiredRecordsCount, fetchPartitionData);
+ ShareAcquiredRecords shareAcquiredRecords =
sharePartition.acquire(shareFetch.memberId(), shareFetch.maxFetchRecords() -
acquiredRecordsCount, fetchPartitionData);
log.trace("Acquired records: {} for topicIdPartition: {}",
shareAcquiredRecords, topicIdPartition);
// Maybe, in the future, check if no records are acquired, and
we want to retry
// replica manager fetch. Depends on the share partition
manager implementation,
@@ -151,11 +151,15 @@ public class ShareFetchUtils {
}
static int leaderEpoch(ReplicaManager replicaManager, TopicPartition tp) {
+ return partition(replicaManager, tp).getLeaderEpoch();
+ }
+
+ static Partition partition(ReplicaManager replicaManager, TopicPartition
tp) {
Partition partition = replicaManager.getPartitionOrException(tp);
if (!partition.isLeader()) {
log.debug("The broker is not the leader for topic partition:
{}-{}", tp.topic(), tp.partition());
throw new NotLeaderOrFollowerException();
}
- return partition.getLeaderEpoch();
+ return partition;
}
}
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java
b/core/src/main/java/kafka/server/share/SharePartition.java
index 71baea10174..632cb1e3169 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -1082,8 +1082,8 @@ public class SharePartition {
/**
* Prior to fetching records from the leader, the fetch lock is acquired
to ensure that the same
- * share partition does not enter a fetch queue while another one is being
fetched within the queue.
- * The fetch lock is released once the records are fetched from the leader.
+ * share partition is not fetched concurrently by multiple clients. The
fetch lock is released once
+ * the records are fetched and acquired.
*
* @return A boolean which indicates whether the fetch lock is acquired.
*/
diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java
b/core/src/main/java/kafka/server/share/SharePartitionManager.java
index 4288dd55703..1c6c5492372 100644
--- a/core/src/main/java/kafka/server/share/SharePartitionManager.java
+++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java
@@ -49,7 +49,7 @@ import
org.apache.kafka.server.share.context.ShareSessionContext;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
import org.apache.kafka.server.share.fetch.DelayedShareFetchKey;
import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey;
-import org.apache.kafka.server.share.fetch.ShareFetchData;
+import org.apache.kafka.server.share.fetch.ShareFetch;
import org.apache.kafka.server.share.persister.Persister;
import org.apache.kafka.server.share.session.ShareSession;
import org.apache.kafka.server.share.session.ShareSessionCache;
@@ -71,10 +71,8 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
/**
* The SharePartitionManager is responsible for managing the SharePartitions
and ShareSessions.
@@ -250,7 +248,7 @@ public class SharePartitionManager implements AutoCloseable
{
partitionMaxBytes.keySet(), groupId, fetchParams);
CompletableFuture<Map<TopicIdPartition, PartitionData>> future = new
CompletableFuture<>();
- processShareFetch(new ShareFetchData(fetchParams, groupId, memberId,
future, partitionMaxBytes, maxFetchRecords));
+ processShareFetch(new ShareFetch(fetchParams, groupId, memberId,
future, partitionMaxBytes, maxFetchRecords));
return future;
}
@@ -498,30 +496,6 @@ public class SharePartitionManager implements
AutoCloseable {
}
}
- /**
- * The handleFetchException method is used to handle the exception that
occurred while reading from log.
- * The method will handle the exception for each topic-partition in the
request. The share partition
- * might get removed from the cache.
- * <p>
- * The replica read request might error out for one share partition
- * but as we cannot determine which share partition errored out, we might
remove all the share partitions
- * in the request.
- *
- * @param groupId The group id in the share fetch request.
- * @param topicIdPartitions The topic-partitions in the replica read
request.
- * @param future The future to complete with the exception.
- * @param throwable The exception that occurred while fetching messages.
- */
- public void handleFetchException(
- String groupId,
- Set<TopicIdPartition> topicIdPartitions,
- CompletableFuture<Map<TopicIdPartition, PartitionData>> future,
- Throwable throwable
- ) {
- topicIdPartitions.forEach(topicIdPartition ->
handleFencedSharePartitionException(sharePartitionKey(groupId,
topicIdPartition), throwable));
- maybeCompleteShareFetchWithException(future, topicIdPartitions,
throwable);
- }
-
/**
* The cachedTopicIdPartitionsInShareSession method is used to get the
cached topic-partitions in the share session.
*
@@ -564,20 +538,18 @@ public class SharePartitionManager implements
AutoCloseable {
}
// Visible for testing.
- void processShareFetch(ShareFetchData shareFetchData) {
- if (shareFetchData.partitionMaxBytes().isEmpty()) {
+ void processShareFetch(ShareFetch shareFetch) {
+ if (shareFetch.partitionMaxBytes().isEmpty()) {
// If there are no partitions to fetch then complete the future
with an empty map.
- shareFetchData.future().complete(Collections.emptyMap());
+ shareFetch.maybeComplete(Collections.emptyMap());
return;
}
- // Initialize lazily, if required.
- Map<TopicIdPartition, Throwable> erroneous = null;
List<DelayedShareFetchKey> delayedShareFetchWatchKeys = new
ArrayList<>();
LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new
LinkedHashMap<>();
- for (TopicIdPartition topicIdPartition :
shareFetchData.partitionMaxBytes().keySet()) {
+ for (TopicIdPartition topicIdPartition :
shareFetch.partitionMaxBytes().keySet()) {
SharePartitionKey sharePartitionKey = sharePartitionKey(
- shareFetchData.groupId(),
+ shareFetch.groupId(),
topicIdPartition
);
@@ -585,15 +557,8 @@ public class SharePartitionManager implements
AutoCloseable {
try {
sharePartition = getOrCreateSharePartition(sharePartitionKey);
} catch (Exception e) {
- // Complete the whole fetch request with an exception if there
is an error processing.
- // The exception currently can be thrown only if there is an
error while initializing
- // the share partition. But skip the processing for other
share partitions in the request
- // as this situation is not expected.
- log.error("Error processing share fetch request", e);
- if (erroneous == null) {
- erroneous = new HashMap<>();
- }
- erroneous.put(topicIdPartition, e);
+ log.debug("Error processing share fetch request", e);
+ shareFetch.addErroneous(topicIdPartition, e);
// Continue iteration for other partitions in the request.
continue;
}
@@ -601,37 +566,42 @@ public class SharePartitionManager implements
AutoCloseable {
// We add a key corresponding to each share partition in the
request in the group so that when there are
// acknowledgements/acquisition lock timeout etc., we have a way
to perform checkAndComplete for all
// such requests which are delayed because of lack of data to
acquire for the share partition.
- delayedShareFetchWatchKeys.add(new
DelayedShareFetchGroupKey(shareFetchData.groupId(), topicIdPartition.topicId(),
topicIdPartition.partition()));
+ DelayedShareFetchKey delayedShareFetchKey = new
DelayedShareFetchGroupKey(shareFetch.groupId(),
+ topicIdPartition.topicId(), topicIdPartition.partition());
+ delayedShareFetchWatchKeys.add(delayedShareFetchKey);
// We add a key corresponding to each topic partition in the
request so that when the HWM is updated
// for any topic partition, we have a way to perform
checkAndComplete for all such requests which are
// delayed because of lack of data to acquire for the topic
partition.
delayedShareFetchWatchKeys.add(new
DelayedShareFetchPartitionKey(topicIdPartition.topicId(),
topicIdPartition.partition()));
- // The share partition is initialized asynchronously, so we need
to wait for it to be initialized.
- // But if the share partition is already initialized, then the
future will be completed immediately.
- // Hence, it's safe to call the maybeInitialize method and then
wait for the future to be completed.
- // TopicPartitionData list will be populated only if the share
partition is already initialized.
- sharePartition.maybeInitialize().whenComplete((result, throwable)
-> {
+
+ CompletableFuture<Void> initializationFuture =
sharePartition.maybeInitialize();
+ final boolean initialized = initializationFuture.isDone();
+ initializationFuture.whenComplete((result, throwable) -> {
if (throwable != null) {
- // TODO: Complete error handling for initialization. We
have to record the error
- // for respective share partition as completing the full
request might result in
- // some acquired records to not being sent:
https://issues.apache.org/jira/browse/KAFKA-17510
-
maybeCompleteInitializationWithException(sharePartitionKey,
shareFetchData.future(), throwable);
+ handleInitializationException(sharePartitionKey,
shareFetch, throwable);
}
+ // Though the share partition is initialized asynchronously,
but if already initialized or
+ // errored then future should be completed immediately. If the
initialization is not completed
+ // immediately then the requests might be waiting in purgatory
until the share partition
+ // is initialized. Hence, trigger the completion of all
pending delayed share fetch requests
+ // for the share partition.
+ if (!initialized)
+
replicaManager.completeDelayedShareFetchRequest(delayedShareFetchKey);
});
sharePartitions.put(topicIdPartition, sharePartition);
}
// If all the partitions in the request errored out, then complete the
fetch request with an exception.
- if (erroneous != null && erroneous.size() ==
shareFetchData.partitionMaxBytes().size()) {
- completeShareFetchWithException(shareFetchData.future(),
erroneous);
+ if (shareFetch.errorInAllPartitions()) {
+ shareFetch.maybeComplete(Collections.emptyMap());
// Do not proceed with share fetch processing as all the
partitions errored out.
return;
}
- // TODO: If there exists some erroneous partitions then they will not
be part of response.
-
// Add the share fetch to the delayed share fetch purgatory to process
the fetch request.
- addDelayedShareFetch(new DelayedShareFetch(shareFetchData,
replicaManager, this, sharePartitions), delayedShareFetchWatchKeys);
+ // The request will be added irrespective of whether the share
partition is initialized or not.
+ // Once the share partition is initialized, the delayed share fetch
will be completed.
+ addDelayedShareFetch(new DelayedShareFetch(shareFetch, replicaManager,
this, sharePartitions), delayedShareFetchWatchKeys);
}
private SharePartition getOrCreateSharePartition(SharePartitionKey
sharePartitionKey) {
@@ -657,28 +627,35 @@ public class SharePartitionManager implements
AutoCloseable {
});
}
- private void maybeCompleteInitializationWithException(
+ private void handleInitializationException(
SharePartitionKey sharePartitionKey,
- CompletableFuture<Map<TopicIdPartition, PartitionData>> future,
+ ShareFetch shareFetch,
Throwable throwable) {
if (throwable instanceof LeaderNotAvailableException) {
log.debug("The share partition with key {} is not initialized
yet", sharePartitionKey);
- // Do not process the fetch request for this partition as the
leader is not initialized yet.
- // The fetch request will be retried in the next poll.
- // TODO: Add the request to delayed fetch purgatory.
+ // Skip any handling for this error as the share partition is
still loading. The request
+ // to fetch will be added in purgatory and will be completed once
either timed out
+ // or the share partition initialization completes.
return;
}
// Remove the partition from the cache as it's failed to initialize.
- partitionCacheMap.remove(sharePartitionKey);
- // The partition initialization failed, so complete the request with
the exception.
- // The server should not be in this state, so log the error on broker
and surface the same
- // to the client. The broker should not be in this state, investigate
the root cause of the error.
- log.error("Error initializing share partition with key {}",
sharePartitionKey, throwable);
- maybeCompleteShareFetchWithException(future,
Collections.singletonList(sharePartitionKey.topicIdPartition()), throwable);
+ SharePartition sharePartition =
partitionCacheMap.remove(sharePartitionKey);
+ if (sharePartition != null) {
+ sharePartition.markFenced();
+ }
+ // The partition initialization failed, so add the partition to the
erroneous partitions.
+ log.debug("Error initializing share partition with key {}",
sharePartitionKey, throwable);
+ shareFetch.addErroneous(sharePartitionKey.topicIdPartition(),
throwable);
}
- private void handleFencedSharePartitionException(
+ /**
+ * The method is used to handle the share partition exception.
+ *
+ * @param sharePartitionKey The share partition key.
+ * @param throwable The exception.
+ */
+ public void handleFencedSharePartitionException(
SharePartitionKey sharePartitionKey,
Throwable throwable
) {
@@ -695,23 +672,6 @@ public class SharePartitionManager implements
AutoCloseable {
}
}
- private void
maybeCompleteShareFetchWithException(CompletableFuture<Map<TopicIdPartition,
PartitionData>> future,
- Collection<TopicIdPartition> topicIdPartitions, Throwable throwable) {
- if (!future.isDone()) {
-
future.complete(topicIdPartitions.stream().collect(Collectors.toMap(
- tp -> tp, tp -> new
PartitionData().setErrorCode(Errors.forException(throwable).code()).setErrorMessage(throwable.getMessage()))));
- }
- }
-
- private void
completeShareFetchWithException(CompletableFuture<Map<TopicIdPartition,
PartitionData>> future,
- Map<TopicIdPartition, Throwable> erroneous) {
- future.complete(erroneous.entrySet().stream().collect(Collectors.toMap(
- Map.Entry::getKey, entry -> {
- Throwable t = entry.getValue();
- return new
PartitionData().setErrorCode(Errors.forException(t).code()).setErrorMessage(t.getMessage());
- })));
- }
-
private SharePartitionKey sharePartitionKey(String groupId,
TopicIdPartition topicIdPartition) {
return new SharePartitionKey(groupId, topicIdPartition);
}
diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
index 0c7b488f180..f1f708f9dd9 100644
--- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
+++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
@@ -31,7 +31,7 @@ import org.apache.kafka.server.purgatory.DelayedOperationKey;
import org.apache.kafka.server.purgatory.DelayedOperationPurgatory;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
-import org.apache.kafka.server.share.fetch.ShareFetchData;
+import org.apache.kafka.server.share.fetch.ShareFetch;
import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.server.storage.log.FetchParams;
import org.apache.kafka.server.storage.log.FetchPartitionData;
@@ -113,13 +113,13 @@ public class DelayedShareFetchTest {
sharePartitions.put(tp0, sp0);
sharePartitions.put(tp1, sp1);
- ShareFetchData shareFetchData = new ShareFetchData(FETCH_PARAMS,
groupId, Uuid.randomUuid().toString(),
+ ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId,
Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes, MAX_FETCH_RECORDS);
when(sp0.canAcquireRecords()).thenReturn(false);
when(sp1.canAcquireRecords()).thenReturn(false);
DelayedShareFetch delayedShareFetch =
spy(DelayedShareFetchBuilder.builder()
- .withShareFetchData(shareFetchData)
+ .withShareFetchData(shareFetch)
.withSharePartitions(sharePartitions)
.build());
@@ -150,7 +150,7 @@ public class DelayedShareFetchTest {
sharePartitions.put(tp0, sp0);
sharePartitions.put(tp1, sp1);
- ShareFetchData shareFetchData = new ShareFetchData(
+ ShareFetch shareFetch = new ShareFetch(
new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(),
FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
2, 1024 * 1024, FetchIsolation.HIGH_WATERMARK,
Optional.empty()), groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes, MAX_FETCH_RECORDS);
@@ -172,7 +172,7 @@ public class DelayedShareFetchTest {
doAnswer(invocation ->
buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(),
any(), any(ReplicaQuota.class), anyBoolean());
DelayedShareFetch delayedShareFetch =
spy(DelayedShareFetchBuilder.builder()
- .withShareFetchData(shareFetchData)
+ .withShareFetchData(shareFetch)
.withSharePartitions(sharePartitions)
.withReplicaManager(replicaManager)
.build());
@@ -205,7 +205,7 @@ public class DelayedShareFetchTest {
sharePartitions.put(tp0, sp0);
sharePartitions.put(tp1, sp1);
- ShareFetchData shareFetchData = new ShareFetchData(
+ ShareFetch shareFetch = new ShareFetch(
new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(),
FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
2, 1024 * 1024, FetchIsolation.HIGH_WATERMARK,
Optional.empty()), groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes, MAX_FETCH_RECORDS);
@@ -223,7 +223,7 @@ public class DelayedShareFetchTest {
mockTopicIdPartitionFetchBytes(replicaManager, tp0, hwmOffsetMetadata);
DelayedShareFetch delayedShareFetch =
spy(DelayedShareFetchBuilder.builder()
- .withShareFetchData(shareFetchData)
+ .withShareFetchData(shareFetch)
.withSharePartitions(sharePartitions)
.withReplicaManager(replicaManager)
.build());
@@ -256,7 +256,7 @@ public class DelayedShareFetchTest {
sharePartitions.put(tp0, sp0);
sharePartitions.put(tp1, sp1);
- ShareFetchData shareFetchData = new ShareFetchData(FETCH_PARAMS,
groupId, Uuid.randomUuid().toString(),
+ ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId,
Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes, MAX_FETCH_RECORDS);
when(sp0.canAcquireRecords()).thenReturn(true);
@@ -268,7 +268,7 @@ public class DelayedShareFetchTest {
when(sp0.fetchOffsetMetadata()).thenReturn(Optional.of(new
LogOffsetMetadata(0, 1, 0)));
mockTopicIdPartitionToReturnDataEqualToMinBytes(replicaManager, tp0,
1);
DelayedShareFetch delayedShareFetch =
spy(DelayedShareFetchBuilder.builder()
- .withShareFetchData(shareFetchData)
+ .withShareFetchData(shareFetch)
.withSharePartitions(sharePartitions)
.withReplicaManager(replicaManager)
.build());
@@ -301,13 +301,14 @@ public class DelayedShareFetchTest {
sharePartitions.put(tp0, sp0);
sharePartitions.put(tp1, sp1);
- ShareFetchData shareFetchData = new ShareFetchData(FETCH_PARAMS,
groupId, Uuid.randomUuid().toString(),
- new CompletableFuture<>(), partitionMaxBytes, MAX_FETCH_RECORDS);
+ CompletableFuture<Map<TopicIdPartition,
ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
+ ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId,
Uuid.randomUuid().toString(),
+ future, partitionMaxBytes, MAX_FETCH_RECORDS);
when(sp0.canAcquireRecords()).thenReturn(false);
when(sp1.canAcquireRecords()).thenReturn(false);
DelayedShareFetch delayedShareFetch =
spy(DelayedShareFetchBuilder.builder()
- .withShareFetchData(shareFetchData)
+ .withShareFetchData(shareFetch)
.withReplicaManager(replicaManager)
.withSharePartitions(sharePartitions)
.build());
@@ -315,7 +316,7 @@ public class DelayedShareFetchTest {
delayedShareFetch.forceComplete();
// Since no partition could be acquired, the future should be empty
and replicaManager.readFromLog should not be called.
- assertEquals(0, shareFetchData.future().join().size());
+ assertEquals(0, future.join().size());
Mockito.verify(replicaManager, times(0)).readFromLog(
any(), any(), any(ReplicaQuota.class), anyBoolean());
assertTrue(delayedShareFetch.isCompleted());
@@ -343,7 +344,7 @@ public class DelayedShareFetchTest {
sharePartitions.put(tp0, sp0);
sharePartitions.put(tp1, sp1);
- ShareFetchData shareFetchData = new ShareFetchData(FETCH_PARAMS,
groupId, Uuid.randomUuid().toString(),
+ ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId,
Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes, MAX_FETCH_RECORDS);
when(sp0.canAcquireRecords()).thenReturn(true);
@@ -352,7 +353,7 @@ public class DelayedShareFetchTest {
ShareAcquiredRecords.fromAcquiredRecords(new
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
1)));
doAnswer(invocation ->
buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(),
any(), any(ReplicaQuota.class), anyBoolean());
DelayedShareFetch delayedShareFetch =
spy(DelayedShareFetchBuilder.builder()
- .withShareFetchData(shareFetchData)
+ .withShareFetchData(shareFetch)
.withReplicaManager(replicaManager)
.withSharePartitions(sharePartitions)
.build());
@@ -365,7 +366,7 @@ public class DelayedShareFetchTest {
Mockito.verify(sp0, times(1)).nextFetchOffset();
Mockito.verify(sp1, times(0)).nextFetchOffset();
assertTrue(delayedShareFetch.isCompleted());
- assertTrue(shareFetchData.future().isDone());
+ assertTrue(shareFetch.isCompleted());
Mockito.verify(delayedShareFetch,
times(1)).releasePartitionLocks(any());
}
@@ -384,14 +385,14 @@ public class DelayedShareFetchTest {
sharePartitions.put(tp0, sp0);
CompletableFuture<Map<TopicIdPartition,
ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
- ShareFetchData shareFetchData = new ShareFetchData(FETCH_PARAMS,
groupId, Uuid.randomUuid().toString(),
+ ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId,
Uuid.randomUuid().toString(),
future, partitionMaxBytes, MAX_FETCH_RECORDS);
when(sp0.maybeAcquireFetchLock()).thenReturn(true);
when(sp0.canAcquireRecords()).thenReturn(false);
DelayedShareFetch delayedShareFetch =
spy(DelayedShareFetchBuilder.builder()
- .withShareFetchData(shareFetchData)
+ .withShareFetchData(shareFetch)
.withReplicaManager(replicaManager)
.withSharePartitions(sharePartitions)
.build());
@@ -402,7 +403,7 @@ public class DelayedShareFetchTest {
assertTrue(delayedShareFetch.isCompleted());
// Verifying that the first forceComplete calls acquirablePartitions
method in DelayedShareFetch.
Mockito.verify(delayedShareFetch, times(1)).acquirablePartitions();
- assertEquals(0, shareFetchData.future().join().size());
+ assertEquals(0, future.join().size());
// Force completing the share fetch request for the second time should
hit the future completion check and not
// proceed ahead in the function.
@@ -438,7 +439,7 @@ public class DelayedShareFetchTest {
sharePartitions1.put(tp1, sp1);
sharePartitions1.put(tp2, sp2);
- ShareFetchData shareFetchData1 = new ShareFetchData(FETCH_PARAMS,
groupId, Uuid.randomUuid().toString(),
+ ShareFetch shareFetch1 = new ShareFetch(FETCH_PARAMS, groupId,
Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes1, MAX_FETCH_RECORDS);
DelayedOperationPurgatory<DelayedShareFetch>
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
@@ -450,7 +451,7 @@ public class DelayedShareFetchTest {
partitionMaxBytes1.keySet().forEach(topicIdPartition ->
delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId,
topicIdPartition.topicId(), topicIdPartition.partition())));
DelayedShareFetch delayedShareFetch1 =
DelayedShareFetchTest.DelayedShareFetchBuilder.builder()
- .withShareFetchData(shareFetchData1)
+ .withShareFetchData(shareFetch1)
.withReplicaManager(replicaManager)
.withSharePartitions(sharePartitions1)
.build();
@@ -460,12 +461,12 @@ public class DelayedShareFetchTest {
delayedShareFetchPurgatory.tryCompleteElseWatch(delayedShareFetch1,
delayedShareFetchWatchKeys);
assertEquals(2, delayedShareFetchPurgatory.watched());
- assertFalse(shareFetchData1.future().isDone());
+ assertFalse(shareFetch1.isCompleted());
Map<TopicIdPartition, Integer> partitionMaxBytes2 = new HashMap<>();
partitionMaxBytes2.put(tp1, PARTITION_MAX_BYTES);
partitionMaxBytes2.put(tp2, PARTITION_MAX_BYTES);
- ShareFetchData shareFetchData2 = new ShareFetchData(FETCH_PARAMS,
groupId, Uuid.randomUuid().toString(),
+ ShareFetch shareFetch2 = new ShareFetch(FETCH_PARAMS, groupId,
Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes2, MAX_FETCH_RECORDS);
doAnswer(invocation ->
buildLogReadResult(Collections.singleton(tp1))).when(replicaManager).readFromLog(any(),
any(), any(ReplicaQuota.class), anyBoolean());
@@ -476,7 +477,7 @@ public class DelayedShareFetchTest {
sharePartitions2.put(tp2, sp2);
DelayedShareFetch delayedShareFetch2 =
spy(DelayedShareFetchBuilder.builder()
- .withShareFetchData(shareFetchData2)
+ .withShareFetchData(shareFetch2)
.withReplicaManager(replicaManager)
.withSharePartitions(sharePartitions2)
.build());
@@ -491,7 +492,7 @@ public class DelayedShareFetchTest {
// requests, it should add a "check and complete" action for request
key tp1 on the purgatory.
delayedShareFetch2.forceComplete();
assertTrue(delayedShareFetch2.isCompleted());
- assertTrue(shareFetchData2.future().isDone());
+ assertTrue(shareFetch2.isCompleted());
Mockito.verify(replicaManager, times(1)).readFromLog(
any(), any(), any(ReplicaQuota.class), anyBoolean());
assertFalse(delayedShareFetch1.isCompleted());
@@ -518,13 +519,13 @@ public class DelayedShareFetchTest {
sharePartitions.put(tp1, sp1);
CompletableFuture<Map<TopicIdPartition,
ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
- ShareFetchData shareFetchData = new ShareFetchData(
+ ShareFetch shareFetch = new ShareFetch(
new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(),
FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK,
Optional.empty()), groupId, Uuid.randomUuid().toString(),
future, partitionMaxBytes, MAX_FETCH_RECORDS);
DelayedShareFetch delayedShareFetch =
DelayedShareFetchBuilder.builder()
- .withShareFetchData(shareFetchData)
+ .withShareFetchData(shareFetch)
.withReplicaManager(replicaManager)
.withSharePartitions(sharePartitions)
.build();
@@ -568,7 +569,7 @@ public class DelayedShareFetchTest {
LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new
LinkedHashMap<>();
sharePartitions.put(tp0, sp0);
- ShareFetchData shareFetchData = new ShareFetchData(
+ ShareFetch shareFetch = new ShareFetch(
new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(),
FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK,
Optional.empty()), groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes, MAX_FETCH_RECORDS);
@@ -583,20 +584,35 @@ public class DelayedShareFetchTest {
when(replicaManager.getPartitionOrException(tp0.topicPartition())).thenReturn(partition);
when(partition.fetchOffsetSnapshot(any(), anyBoolean())).thenThrow(new
RuntimeException("Exception thrown"));
+ SharePartitionManager sharePartitionManager =
mock(SharePartitionManager.class);
DelayedShareFetch delayedShareFetch =
spy(DelayedShareFetchBuilder.builder()
- .withShareFetchData(shareFetchData)
+ .withShareFetchData(shareFetch)
.withSharePartitions(sharePartitions)
.withReplicaManager(replicaManager)
+ .withSharePartitionManager(sharePartitionManager)
.build());
+
+ // Try complete should return false as the share partition has errored
out.
+ assertFalse(delayedShareFetch.tryComplete());
+ // Fetch should remain pending and should be completed on request
timeout.
assertFalse(delayedShareFetch.isCompleted());
+ // The request should be errored out as topic partition should get
added as erroneous.
+ assertTrue(shareFetch.errorInAllPartitions());
- // Since minBytes calculation throws an exception and returns true,
tryComplete should return true.
- assertTrue(delayedShareFetch.tryComplete());
+ Mockito.verify(sharePartitionManager,
times(1)).handleFencedSharePartitionException(any(), any());
+ Mockito.verify(replicaManager, times(1)).readFromLog(
+ any(), any(), any(ReplicaQuota.class), anyBoolean());
+ Mockito.verify(delayedShareFetch,
times(1)).releasePartitionLocks(any());
+
+ // Force complete the request as it's still pending. Return false from
the share partition lock acquire.
+ when(sp0.maybeAcquireFetchLock()).thenReturn(false);
+ assertTrue(delayedShareFetch.forceComplete());
assertTrue(delayedShareFetch.isCompleted());
- Mockito.verify(replicaManager, times(2)).readFromLog(
+
+ // Read from log and release partition locks should not be called as
the request is errored out.
+ Mockito.verify(replicaManager, times(1)).readFromLog(
any(), any(), any(ReplicaQuota.class), anyBoolean());
- // releasePartitionLocks will be called twice, once from tryComplete
and then from onComplete.
- Mockito.verify(delayedShareFetch,
times(2)).releasePartitionLocks(any());
+ Mockito.verify(delayedShareFetch,
times(1)).releasePartitionLocks(any());
}
@Test
@@ -615,11 +631,11 @@ public class DelayedShareFetchTest {
doAnswer(invocation ->
buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(),
any(), any(ReplicaQuota.class), anyBoolean());
mockTopicIdPartitionToReturnDataEqualToMinBytes(replicaManager, tp0,
1);
- ShareFetchData shareFetchData = new ShareFetchData(FETCH_PARAMS,
groupId, Uuid.randomUuid().toString(),
+ ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId,
Uuid.randomUuid().toString(),
new CompletableFuture<>(), Map.of(tp0, PARTITION_MAX_BYTES),
MAX_FETCH_RECORDS);
DelayedShareFetch delayedShareFetch =
DelayedShareFetchTest.DelayedShareFetchBuilder.builder()
- .withShareFetchData(shareFetchData)
+ .withShareFetchData(shareFetch)
.withSharePartitions(sharePartitions1)
.withReplicaManager(replicaManager)
.build();
@@ -643,11 +659,11 @@ public class DelayedShareFetchTest {
LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new
LinkedHashMap<>();
sharePartitions.put(tp0, sp0);
- ShareFetchData shareFetchData = new ShareFetchData(FETCH_PARAMS,
groupId, Uuid.randomUuid().toString(),
+ ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId,
Uuid.randomUuid().toString(),
new CompletableFuture<>(), Map.of(tp0, PARTITION_MAX_BYTES),
MAX_FETCH_RECORDS);
DelayedShareFetch delayedShareFetch =
DelayedShareFetchTest.DelayedShareFetchBuilder.builder()
- .withShareFetchData(shareFetchData)
+ .withShareFetchData(shareFetch)
.withSharePartitions(sharePartitions)
.build();
@@ -675,13 +691,13 @@ public class DelayedShareFetchTest {
}
static class DelayedShareFetchBuilder {
- ShareFetchData shareFetchData = mock(ShareFetchData.class);
+ ShareFetch shareFetch = mock(ShareFetch.class);
private ReplicaManager replicaManager = mock(ReplicaManager.class);
- private final SharePartitionManager sharePartitionManager =
mock(SharePartitionManager.class);
+ private SharePartitionManager sharePartitionManager =
mock(SharePartitionManager.class);
private LinkedHashMap<TopicIdPartition, SharePartition>
sharePartitions = mock(LinkedHashMap.class);
- DelayedShareFetchBuilder withShareFetchData(ShareFetchData
shareFetchData) {
- this.shareFetchData = shareFetchData;
+ DelayedShareFetchBuilder withShareFetchData(ShareFetch shareFetch) {
+ this.shareFetch = shareFetch;
return this;
}
@@ -690,6 +706,11 @@ public class DelayedShareFetchTest {
return this;
}
+ DelayedShareFetchBuilder
withSharePartitionManager(SharePartitionManager sharePartitionManager) {
+ this.sharePartitionManager = sharePartitionManager;
+ return this;
+ }
+
DelayedShareFetchBuilder
withSharePartitions(LinkedHashMap<TopicIdPartition, SharePartition>
sharePartitions) {
this.sharePartitions = sharePartitions;
return this;
@@ -701,7 +722,7 @@ public class DelayedShareFetchTest {
public DelayedShareFetch build() {
return new DelayedShareFetch(
- shareFetchData,
+ shareFetch,
replicaManager,
sharePartitionManager,
sharePartitions);
diff --git a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
index 6ff0f90bc49..f8a53e9aa33 100644
--- a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
+++ b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
@@ -31,7 +31,7 @@ import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
-import org.apache.kafka.server.share.fetch.ShareFetchData;
+import org.apache.kafka.server.share.fetch.ShareFetch;
import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.server.storage.log.FetchParams;
import org.apache.kafka.server.storage.log.FetchPartitionData;
@@ -101,7 +101,7 @@ public class ShareFetchUtilsTest {
sharePartitions.put(tp0, sp0);
sharePartitions.put(tp1, sp1);
- ShareFetchData shareFetchData = new ShareFetchData(FETCH_PARAMS,
groupId, memberId,
+ ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, memberId,
new CompletableFuture<>(), partitionMaxBytes, 100);
MemoryRecords records = MemoryRecords.withRecords(Compression.NONE,
@@ -124,7 +124,7 @@ public class ShareFetchUtilsTest {
records1, Optional.empty(), OptionalLong.empty(),
Optional.empty(),
OptionalInt.empty(), false));
Map<TopicIdPartition, ShareFetchResponseData.PartitionData> resultData
=
- ShareFetchUtils.processFetchResponse(shareFetchData,
responseData, sharePartitions, mock(ReplicaManager.class));
+ ShareFetchUtils.processFetchResponse(shareFetch, responseData,
sharePartitions, mock(ReplicaManager.class));
assertEquals(2, resultData.size());
assertTrue(resultData.containsKey(tp0));
@@ -167,7 +167,7 @@ public class ShareFetchUtilsTest {
sharePartitions.put(tp0, sp0);
sharePartitions.put(tp1, sp1);
- ShareFetchData shareFetchData = new ShareFetchData(FETCH_PARAMS,
groupId, memberId,
+ ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, memberId,
new CompletableFuture<>(), partitionMaxBytes, 100);
Map<TopicIdPartition, FetchPartitionData> responseData = new
HashMap<>();
@@ -178,7 +178,7 @@ public class ShareFetchUtilsTest {
MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(),
Optional.empty(),
OptionalInt.empty(), false));
Map<TopicIdPartition, ShareFetchResponseData.PartitionData> resultData
=
- ShareFetchUtils.processFetchResponse(shareFetchData, responseData,
sharePartitions, mock(ReplicaManager.class));
+ ShareFetchUtils.processFetchResponse(shareFetch, responseData,
sharePartitions, mock(ReplicaManager.class));
assertEquals(2, resultData.size());
assertTrue(resultData.containsKey(tp0));
@@ -209,7 +209,7 @@ public class ShareFetchUtilsTest {
sharePartitions.put(tp0, sp0);
sharePartitions.put(tp1, sp1);
- ShareFetchData shareFetchData = new ShareFetchData(FETCH_PARAMS,
groupId, Uuid.randomUuid().toString(),
+ ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId,
Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes, 100);
ReplicaManager replicaManager = mock(ReplicaManager.class);
@@ -247,7 +247,7 @@ public class ShareFetchUtilsTest {
records1, Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
Map<TopicIdPartition, ShareFetchResponseData.PartitionData>
resultData1 =
- ShareFetchUtils.processFetchResponse(shareFetchData,
responseData1, sharePartitions, replicaManager);
+ ShareFetchUtils.processFetchResponse(shareFetch, responseData1,
sharePartitions, replicaManager);
assertEquals(2, resultData1.size());
assertTrue(resultData1.containsKey(tp0));
@@ -276,7 +276,7 @@ public class ShareFetchUtilsTest {
MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(),
Optional.empty(),
OptionalInt.empty(), false));
Map<TopicIdPartition, ShareFetchResponseData.PartitionData>
resultData2 =
- ShareFetchUtils.processFetchResponse(shareFetchData,
responseData2, sharePartitions, replicaManager);
+ ShareFetchUtils.processFetchResponse(shareFetch, responseData2,
sharePartitions, replicaManager);
assertEquals(2, resultData2.size());
assertTrue(resultData2.containsKey(tp0));
@@ -303,7 +303,7 @@ public class ShareFetchUtilsTest {
LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new
LinkedHashMap<>();
sharePartitions.put(tp0, sp0);
- ShareFetchData shareFetchData = new ShareFetchData(FETCH_PARAMS,
groupId, Uuid.randomUuid().toString(),
+ ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId,
Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes, 100);
ReplicaManager replicaManager = mock(ReplicaManager.class);
@@ -327,7 +327,7 @@ public class ShareFetchUtilsTest {
OptionalInt.empty(), false));
Map<TopicIdPartition, ShareFetchResponseData.PartitionData> resultData
=
- ShareFetchUtils.processFetchResponse(shareFetchData, responseData,
sharePartitions, replicaManager);
+ ShareFetchUtils.processFetchResponse(shareFetch, responseData,
sharePartitions, replicaManager);
assertEquals(1, resultData.size());
assertTrue(resultData.containsKey(tp0));
@@ -342,7 +342,7 @@ public class ShareFetchUtilsTest {
records, Optional.empty(), OptionalLong.empty(),
Optional.empty(),
OptionalInt.empty(), false));
- resultData = ShareFetchUtils.processFetchResponse(shareFetchData,
responseData, sharePartitions, replicaManager);
+ resultData = ShareFetchUtils.processFetchResponse(shareFetch,
responseData, sharePartitions, replicaManager);
assertEquals(1, resultData.size());
assertTrue(resultData.containsKey(tp0));
@@ -376,7 +376,7 @@ public class ShareFetchUtilsTest {
Uuid memberId = Uuid.randomUuid();
// Set max fetch records to 10
- ShareFetchData shareFetchData = new ShareFetchData(
+ ShareFetch shareFetch = new ShareFetch(
new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(),
FetchRequest.ORDINARY_CONSUMER_ID, -1, 0,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK,
Optional.empty()),
groupId, memberId.toString(), new CompletableFuture<>(),
partitionMaxBytes, 10);
@@ -413,7 +413,7 @@ public class ShareFetchUtilsTest {
responseData1.put(tp1, fetchPartitionData2);
Map<TopicIdPartition, ShareFetchResponseData.PartitionData>
resultData1 =
- ShareFetchUtils.processFetchResponse(shareFetchData,
responseData1, sharePartitions, replicaManager);
+ ShareFetchUtils.processFetchResponse(shareFetch, responseData1,
sharePartitions, replicaManager);
assertEquals(2, resultData1.size());
assertTrue(resultData1.containsKey(tp0));
diff --git
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index 46abf04b0a6..fbbccadff8b 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -65,7 +65,7 @@ import
org.apache.kafka.server.share.context.ShareSessionContext;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
import org.apache.kafka.server.share.fetch.DelayedShareFetchKey;
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
-import org.apache.kafka.server.share.fetch.ShareFetchData;
+import org.apache.kafka.server.share.fetch.ShareFetch;
import org.apache.kafka.server.share.persister.NoOpShareStatePersister;
import org.apache.kafka.server.share.persister.Persister;
import org.apache.kafka.server.share.session.ShareSession;
@@ -223,7 +223,7 @@ public class SharePartitionManagerTest {
ShareRequestMetadata reqMetadata2 = new ShareRequestMetadata(memberId,
ShareRequestMetadata.FINAL_EPOCH);
- // shareFetchData is not empty, but the maxBytes of topic partition is
0, which means this is added only for acknowledgements.
+ // shareFetch is not empty, but the maxBytes of topic partition is 0,
which means this is added only for acknowledgements.
// New context should be created successfully
Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> reqData3 =
Collections.singletonMap(new TopicIdPartition(tpId1, new TopicPartition("foo",
0)),
new ShareFetchRequest.SharePartitionData(tpId1, 0));
@@ -257,7 +257,7 @@ public class SharePartitionManagerTest {
ShareRequestMetadata reqMetadata2 = new ShareRequestMetadata(memberId,
ShareRequestMetadata.FINAL_EPOCH);
- // shareFetchData is not empty and the maxBytes of topic partition is
not 0, which means this is trying to fetch on a Final request.
+ // shareFetch is not empty and the maxBytes of topic partition is not
0, which means this is trying to fetch on a Final request.
// New context should throw an error
Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> reqData3 =
Collections.singletonMap(new TopicIdPartition(tpId1, new TopicPartition("foo",
0)),
new ShareFetchRequest.SharePartitionData(tpId1,
PARTITION_MAX_BYTES));
@@ -1665,7 +1665,7 @@ public class SharePartitionManagerTest {
partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2);
- ShareFetchData shareFetchData = new ShareFetchData(
+ ShareFetch shareFetch = new ShareFetch(
FETCH_PARAMS,
groupId,
Uuid.randomUuid().toString(),
@@ -1700,7 +1700,7 @@ public class SharePartitionManagerTest {
sharePartitions.put(tp2, sp2);
DelayedShareFetch delayedShareFetch =
DelayedShareFetchTest.DelayedShareFetchBuilder.builder()
- .withShareFetchData(shareFetchData)
+ .withShareFetchData(shareFetch)
.withReplicaManager(mockReplicaManager)
.withSharePartitions(sharePartitions)
.build();
@@ -1765,7 +1765,7 @@ public class SharePartitionManagerTest {
partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2);
partitionCacheMap.put(new SharePartitionKey(groupId, tp3), sp3);
- ShareFetchData shareFetchData = new ShareFetchData(
+ ShareFetch shareFetch = new ShareFetch(
FETCH_PARAMS,
groupId,
Uuid.randomUuid().toString(),
@@ -1801,7 +1801,7 @@ public class SharePartitionManagerTest {
sharePartitions.put(tp3, sp3);
DelayedShareFetch delayedShareFetch =
DelayedShareFetchTest.DelayedShareFetchBuilder.builder()
- .withShareFetchData(shareFetchData)
+ .withShareFetchData(shareFetch)
.withReplicaManager(mockReplicaManager)
.withSharePartitions(sharePartitions)
.build();
@@ -1861,7 +1861,7 @@ public class SharePartitionManagerTest {
partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2);
- ShareFetchData shareFetchData = new ShareFetchData(
+ ShareFetch shareFetch = new ShareFetch(
FETCH_PARAMS,
groupId,
Uuid.randomUuid().toString(),
@@ -1897,7 +1897,7 @@ public class SharePartitionManagerTest {
sharePartitions.put(tp2, sp2);
DelayedShareFetch delayedShareFetch =
DelayedShareFetchTest.DelayedShareFetchBuilder.builder()
- .withShareFetchData(shareFetchData)
+ .withShareFetchData(shareFetch)
.withReplicaManager(mockReplicaManager)
.withSharePartitions(sharePartitions)
.build();
@@ -1965,7 +1965,7 @@ public class SharePartitionManagerTest {
partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2);
partitionCacheMap.put(new SharePartitionKey(groupId, tp3), sp3);
- ShareFetchData shareFetchData = new ShareFetchData(
+ ShareFetch shareFetch = new ShareFetch(
FETCH_PARAMS,
groupId,
Uuid.randomUuid().toString(),
@@ -2002,7 +2002,7 @@ public class SharePartitionManagerTest {
sharePartitions.put(tp3, sp3);
DelayedShareFetch delayedShareFetch =
DelayedShareFetchTest.DelayedShareFetchBuilder.builder()
- .withShareFetchData(shareFetchData)
+ .withShareFetchData(shareFetch)
.withReplicaManager(mockReplicaManager)
.withSharePartitions(sharePartitions)
.build();
@@ -2063,10 +2063,74 @@ public class SharePartitionManagerTest {
// Verify that replica manager fetch is not called.
Mockito.verify(mockReplicaManager, times(0)).readFromLog(
any(), any(), any(ReplicaQuota.class), anyBoolean());
+ assertFalse(pendingInitializationFuture.isDone());
// Complete the pending initialization future.
pendingInitializationFuture.complete(null);
}
+ @Test
+ public void testDelayedInitializationShouldCompleteFetchRequest() throws
Exception {
+ String groupId = "grp";
+ Uuid memberId = Uuid.randomUuid();
+ Uuid fooId = Uuid.randomUuid();
+ TopicIdPartition tp0 = new TopicIdPartition(fooId, new
TopicPartition("foo", 0));
+ Map<TopicIdPartition, Integer> partitionMaxBytes =
Collections.singletonMap(tp0, PARTITION_MAX_BYTES);
+
+ SharePartition sp0 = mock(SharePartition.class);
+ Map<SharePartitionKey, SharePartition> partitionCacheMap = new
HashMap<>();
+ partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
+
+ // Keep the 2 initialization futures pending and 1 completed with
leader not available exception.
+ CompletableFuture<Void> pendingInitializationFuture1 = new
CompletableFuture<>();
+ CompletableFuture<Void> pendingInitializationFuture2 = new
CompletableFuture<>();
+ when(sp0.maybeInitialize()).
+ thenReturn(pendingInitializationFuture1)
+ .thenReturn(pendingInitializationFuture2)
+ .thenReturn(CompletableFuture.failedFuture(new
LeaderNotAvailableException("Leader not available")));
+
+ DelayedOperationPurgatory<DelayedShareFetch> shareFetchPurgatorySpy =
spy(new DelayedOperationPurgatory<>(
+ "TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
+ DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true));
+ mockReplicaManagerDelayedShareFetch(mockReplicaManager,
shareFetchPurgatorySpy);
+
+ SharePartitionManager sharePartitionManager =
SharePartitionManagerBuilder.builder()
+
.withPartitionCacheMap(partitionCacheMap).withReplicaManager(mockReplicaManager).withTimer(mockTimer)
+ .build();
+
+ // Send 3 requests for share fetch for same share partition.
+ CompletableFuture<Map<TopicIdPartition,
ShareFetchResponseData.PartitionData>> future1 =
+ sharePartitionManager.fetchMessages(groupId, memberId.toString(),
FETCH_PARAMS, partitionMaxBytes);
+
+ CompletableFuture<Map<TopicIdPartition,
ShareFetchResponseData.PartitionData>> future2 =
+ sharePartitionManager.fetchMessages(groupId, memberId.toString(),
FETCH_PARAMS, partitionMaxBytes);
+
+ CompletableFuture<Map<TopicIdPartition,
ShareFetchResponseData.PartitionData>> future3 =
+ sharePartitionManager.fetchMessages(groupId, memberId.toString(),
FETCH_PARAMS, partitionMaxBytes);
+
+ Mockito.verify(sp0, times(3)).maybeInitialize();
+ Mockito.verify(mockReplicaManager,
times(3)).addDelayedShareFetchRequest(any(), any());
+ Mockito.verify(shareFetchPurgatorySpy,
times(3)).tryCompleteElseWatch(any(), any());
+ Mockito.verify(shareFetchPurgatorySpy,
times(0)).checkAndComplete(any());
+
+ // All 3 requests should be pending.
+ assertFalse(future1.isDone());
+ assertFalse(future2.isDone());
+ assertFalse(future3.isDone());
+
+ // Complete one pending initialization future.
+ pendingInitializationFuture1.complete(null);
+ Mockito.verify(mockReplicaManager,
times(1)).completeDelayedShareFetchRequest(any());
+ Mockito.verify(shareFetchPurgatorySpy,
times(1)).checkAndComplete(any());
+
+ pendingInitializationFuture2.complete(null);
+ Mockito.verify(mockReplicaManager,
times(2)).completeDelayedShareFetchRequest(any());
+ Mockito.verify(shareFetchPurgatorySpy,
times(2)).checkAndComplete(any());
+
+ // Verify that replica manager fetch is not called.
+ Mockito.verify(mockReplicaManager, times(0)).readFromLog(
+ any(), any(), any(ReplicaQuota.class), anyBoolean());
+ }
+
@Test
public void testSharePartitionInitializationExceptions() throws Exception {
String groupId = "grp";
@@ -2100,6 +2164,7 @@ public class SharePartitionManagerTest {
// between SharePartitionManager and SharePartition to retry the
request as SharePartition is not yet ready.
assertFalse(future.isCompletedExceptionally());
assertTrue(future.join().isEmpty());
+ Mockito.verify(sp0, times(0)).markFenced();
// Verify that the share partition is still in the cache on
LeaderNotAvailableException.
assertEquals(1, partitionCacheMap.size());
@@ -2111,6 +2176,7 @@ public class SharePartitionManagerTest {
DELAYED_SHARE_FETCH_TIMEOUT_MS,
() -> "Processing in delayed share fetch queue never ended.");
validateShareFetchFutureException(future, tp0,
Errors.UNKNOWN_SERVER_ERROR, "Illegal state");
+ Mockito.verify(sp0, times(1)).markFenced();
assertTrue(partitionCacheMap.isEmpty());
// The last exception removes the share partition from the cache hence
re-add the share partition to cache.
@@ -2123,6 +2189,7 @@ public class SharePartitionManagerTest {
DELAYED_SHARE_FETCH_TIMEOUT_MS,
() -> "Processing in delayed share fetch queue never ended.");
validateShareFetchFutureException(future, tp0,
Errors.COORDINATOR_NOT_AVAILABLE, "Coordinator not available");
+ Mockito.verify(sp0, times(2)).markFenced();
assertTrue(partitionCacheMap.isEmpty());
// The last exception removes the share partition from the cache hence
re-add the share partition to cache.
@@ -2135,6 +2202,7 @@ public class SharePartitionManagerTest {
DELAYED_SHARE_FETCH_TIMEOUT_MS,
() -> "Processing in delayed share fetch queue never ended.");
validateShareFetchFutureException(future, tp0, Errors.INVALID_REQUEST,
"Invalid request");
+ Mockito.verify(sp0, times(3)).markFenced();
assertTrue(partitionCacheMap.isEmpty());
// The last exception removes the share partition from the cache hence
re-add the share partition to cache.
@@ -2147,6 +2215,7 @@ public class SharePartitionManagerTest {
DELAYED_SHARE_FETCH_TIMEOUT_MS,
() -> "Processing in delayed share fetch queue never ended.");
validateShareFetchFutureException(future, tp0,
Errors.FENCED_STATE_EPOCH, "Fenced state epoch");
+ Mockito.verify(sp0, times(4)).markFenced();
assertTrue(partitionCacheMap.isEmpty());
// The last exception removes the share partition from the cache hence
re-add the share partition to cache.
@@ -2159,6 +2228,7 @@ public class SharePartitionManagerTest {
DELAYED_SHARE_FETCH_TIMEOUT_MS,
() -> "Processing in delayed share fetch queue never ended.");
validateShareFetchFutureException(future, tp0,
Errors.NOT_LEADER_OR_FOLLOWER, "Not leader or follower");
+ Mockito.verify(sp0, times(5)).markFenced();
assertTrue(partitionCacheMap.isEmpty());
// The last exception removes the share partition from the cache hence
re-add the share partition to cache.
@@ -2171,6 +2241,7 @@ public class SharePartitionManagerTest {
DELAYED_SHARE_FETCH_TIMEOUT_MS,
() -> "Processing in delayed share fetch queue never ended.");
validateShareFetchFutureException(future, tp0,
Errors.UNKNOWN_SERVER_ERROR, "Runtime exception");
+ Mockito.verify(sp0, times(6)).markFenced();
assertTrue(partitionCacheMap.isEmpty());
}
@@ -2247,18 +2318,25 @@ public class SharePartitionManagerTest {
public void testSharePartitionPartialInitializationFailure() throws
Exception {
String groupId = "grp";
Uuid memberId1 = Uuid.randomUuid();
+ // For tp0, share partition instantiation will fail.
TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new
TopicPartition("foo", 0));
+ // For tp1, share fetch should succeed.
TopicIdPartition tp1 = new TopicIdPartition(memberId1, new
TopicPartition("foo", 1));
- Map<TopicIdPartition, Integer> partitionMaxBytes = Map.of(tp0,
PARTITION_MAX_BYTES, tp1, PARTITION_MAX_BYTES);
-
- // Mark partition1 as not the leader.
- Partition partition1 = mock(Partition.class);
- when(partition1.isLeader()).thenReturn(false);
-
+ // For tp2, share partition initialization will fail.
+ TopicIdPartition tp2 = new TopicIdPartition(memberId1, new
TopicPartition("foo", 2));
+ Map<TopicIdPartition, Integer> partitionMaxBytes = Map.of(
+ tp0, PARTITION_MAX_BYTES,
+ tp1, PARTITION_MAX_BYTES,
+ tp2, PARTITION_MAX_BYTES);
+
+ // Mark partition0 as not the leader.
+ Partition partition0 = mock(Partition.class);
+ when(partition0.isLeader()).thenReturn(false);
ReplicaManager replicaManager = mock(ReplicaManager.class);
when(replicaManager.getPartitionOrException(any()))
- .thenReturn(partition1);
+ .thenReturn(partition0);
+ // Mock share partition for tp1, so it can succeed.
SharePartition sp1 = mock(SharePartition.class);
Map<SharePartitionKey, SharePartition> partitionCacheMap = new
HashMap<>();
partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
@@ -2268,6 +2346,11 @@ public class SharePartitionManagerTest {
when(sp1.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
when(sp1.acquire(anyString(), anyInt(), any())).thenReturn(new
ShareAcquiredRecords(Collections.emptyList(), 0));
+ // Fail initialization for tp2.
+ SharePartition sp2 = mock(SharePartition.class);
+ partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2);
+
when(sp2.maybeInitialize()).thenReturn(CompletableFuture.failedFuture(new
FencedStateEpochException("Fenced state epoch")));
+
DelayedOperationPurgatory<DelayedShareFetch>
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer, replicaManager.localBrokerId(),
DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
@@ -2289,11 +2372,16 @@ public class SharePartitionManagerTest {
assertFalse(future.isCompletedExceptionally());
Map<TopicIdPartition, PartitionData> partitionDataMap = future.get();
- // For now only 1 successful partition is included, this will be fixed
in subsequents PRs.
- assertEquals(1, partitionDataMap.size());
+ assertEquals(3, partitionDataMap.size());
+ assertTrue(partitionDataMap.containsKey(tp0));
+ assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.code(),
partitionDataMap.get(tp0).errorCode());
assertTrue(partitionDataMap.containsKey(tp1));
assertEquals(Errors.NONE.code(),
partitionDataMap.get(tp1).errorCode());
+ assertTrue(partitionDataMap.containsKey(tp2));
+ assertEquals(Errors.FENCED_STATE_EPOCH.code(),
partitionDataMap.get(tp2).errorCode());
+ assertEquals("Fenced state epoch",
partitionDataMap.get(tp2).errorMessage());
+ Mockito.verify(replicaManager,
times(0)).completeDelayedShareFetchRequest(any());
Mockito.verify(replicaManager, times(1)).readFromLog(
any(), any(), any(ReplicaQuota.class), anyBoolean());
}
diff --git
a/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
index 8097021e4cb..d2679cd62ea 100644
---
a/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
@@ -16,6 +16,7 @@
*/
package kafka.server
+import kafka.utils.TestUtils
import org.apache.kafka.common.test.api.{ClusterConfigProperty,
ClusterInstance, ClusterTest, ClusterTestDefaults, ClusterTestExtensions,
ClusterTests, Type}
import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords
import org.apache.kafka.common.message.{ShareAcknowledgeRequestData,
ShareAcknowledgeResponseData, ShareFetchRequestData, ShareFetchResponseData}
@@ -253,13 +254,26 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
val metadata = new ShareRequestMetadata(memberId,
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
val acknowledgementsMap: Map[TopicIdPartition,
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
val shareFetchRequest = createShareFetchRequest(groupId, metadata,
MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap)
- val shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
- val shareFetchResponseData = shareFetchResponse.data()
- assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
- assertEquals(1, shareFetchResponseData.responses().size())
- assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
- assertEquals(3,
shareFetchResponseData.responses().get(0).partitions().size())
+ // For the multi partition fetch request, the response may not be
available in the first attempt
+ // as the share partitions might not be initialized yet. So, we retry
until we get the response.
+ var responses = Seq[ShareFetchResponseData.PartitionData]()
+ TestUtils.waitUntilTrue(() => {
+ val shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+ val shareFetchResponseData = shareFetchResponse.data()
+ assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+ assertEquals(1, shareFetchResponseData.responses().size())
+ val partitionsCount =
shareFetchResponseData.responses().get(0).partitions().size()
+ if (partitionsCount > 0) {
+ assertEquals(topicId,
shareFetchResponseData.responses().get(0).topicId())
+
shareFetchResponseData.responses().get(0).partitions().foreach(partitionData =>
{
+ if (!partitionData.acquiredRecords().isEmpty) {
+ responses = responses :+ partitionData
+ }
+ })
+ }
+ responses.size == 3
+ }, "Share fetch request failed", 5000)
val expectedPartitionData1 = new ShareFetchResponseData.PartitionData()
.setPartitionIndex(0)
@@ -279,7 +293,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
.setAcknowledgeErrorCode(Errors.NONE.code())
.setAcquiredRecords(expectedAcquiredRecords(Collections.singletonList(0),
Collections.singletonList(9), Collections.singletonList(1)))
-
shareFetchResponseData.responses().get(0).partitions().foreach(partitionData =>
{
+ responses.foreach(partitionData => {
partitionData.partitionIndex() match {
case 0 => compareFetchResponsePartitions(expectedPartitionData1,
partitionData)
case 1 => compareFetchResponsePartitions(expectedPartitionData2,
partitionData)
@@ -2230,13 +2244,26 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
val acknowledgementsMap: Map[TopicIdPartition,
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata,
MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap)
- var shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
- var shareFetchResponseData = shareFetchResponse.data()
- assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
- assertEquals(1, shareFetchResponseData.responses().size())
- assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
- assertEquals(2,
shareFetchResponseData.responses().get(0).partitions().size())
+ // For the multi partition fetch request, the response may not be
available in the first attempt
+ // as the share partitions might not be initialized yet. So, we retry
until we get the response.
+ var responses = Seq[ShareFetchResponseData.PartitionData]()
+ TestUtils.waitUntilTrue(() => {
+ val shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+ val shareFetchResponseData = shareFetchResponse.data()
+ assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+ assertEquals(1, shareFetchResponseData.responses().size())
+ val partitionsCount =
shareFetchResponseData.responses().get(0).partitions().size()
+ if (partitionsCount > 0) {
+ assertEquals(topicId,
shareFetchResponseData.responses().get(0).topicId())
+
shareFetchResponseData.responses().get(0).partitions().foreach(partitionData =>
{
+ if (!partitionData.acquiredRecords().isEmpty) {
+ responses = responses :+ partitionData
+ }
+ })
+ }
+ responses.size == 2
+ }, "Share fetch request failed", 5000)
// Producing 10 more records to the topic partitions created above
produceData(topicIdPartition1, 10)
@@ -2247,9 +2274,9 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
val forget: Seq[TopicIdPartition] = Seq(topicIdPartition1)
shareFetchRequest = createShareFetchRequest(groupId, metadata,
MAX_PARTITION_BYTES, Seq.empty, forget, acknowledgementsMap)
- shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+ val shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
- shareFetchResponseData = shareFetchResponse.data()
+ val shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
@@ -2265,10 +2292,25 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
compareFetchResponsePartitions(expectedPartitionData, partitionData)
}
+ // For initial fetch request, the response may not be available in the first
attempt when the share
+ // partition is not initialized yet. Hence, wait for response from all
partitions before proceeding.
private def sendFirstShareFetchRequest(memberId: Uuid, groupId: String,
topicIdPartitions: Seq[TopicIdPartition]): Unit = {
- val metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId,
ShareRequestMetadata.INITIAL_EPOCH)
- val shareFetchRequest = createShareFetchRequest(groupId, metadata,
MAX_PARTITION_BYTES, topicIdPartitions, Seq.empty, Map.empty)
- connectAndReceive[ShareFetchResponse](shareFetchRequest)
+ val partitions: util.Set[Integer] = new util.HashSet()
+ TestUtils.waitUntilTrue(() => {
+ val metadata = new ShareRequestMetadata(memberId,
ShareRequestMetadata.INITIAL_EPOCH)
+ val shareFetchRequest = createShareFetchRequest(groupId, metadata,
MAX_PARTITION_BYTES, topicIdPartitions, Seq.empty, Map.empty)
+ val shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+ val shareFetchResponseData = shareFetchResponse.data()
+
+ assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+ shareFetchResponseData.responses().foreach(response => {
+ if (!response.partitions().isEmpty) {
+ response.partitions().forEach(partitionData =>
partitions.add(partitionData.partitionIndex))
+ }
+ })
+
+ partitions.size() == topicIdPartitions.size
+ }, "Share fetch request failed", 5000)
}
private def expectedAcquiredRecords(firstOffsets: util.List[Long],
lastOffsets: util.List[Long], deliveryCounts: util.List[Int]):
util.List[AcquiredRecords] = {
diff --git
a/share/src/main/java/org/apache/kafka/server/share/fetch/ShareFetch.java
b/share/src/main/java/org/apache/kafka/server/share/fetch/ShareFetch.java
new file mode 100644
index 00000000000..7c60501ffd6
--- /dev/null
+++ b/share/src/main/java/org/apache/kafka/server/share/fetch/ShareFetch.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.share.fetch;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.message.ShareFetchResponseData.PartitionData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.server.storage.log.FetchParams;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * The ShareFetch class is used to store the fetch parameters for a share
fetch request.
+ */
+public class ShareFetch {
+
+ /**
+ * The future that will be completed when the fetch is done.
+ */
+ private final CompletableFuture<Map<TopicIdPartition, PartitionData>>
future;
+
+ /**
+ * The fetch parameters for the fetch request.
+ */
+ private final FetchParams fetchParams;
+ /**
+ * The group id of the share group that is fetching the records.
+ */
+ private final String groupId;
+ /**
+ * The member id of the share group that is fetching the records.
+ */
+ private final String memberId;
+ /**
+ * The maximum number of bytes that can be fetched for each partition.
+ */
+ private final Map<TopicIdPartition, Integer> partitionMaxBytes;
+ /**
+ * The maximum number of records that can be fetched for the request.
+ */
+ private final int maxFetchRecords;
+ /**
+ * The partitions that had an error during the fetch.
+ */
+ private Map<TopicIdPartition, Throwable> erroneous;
+
+ public ShareFetch(
+ FetchParams fetchParams,
+ String groupId,
+ String memberId,
+ CompletableFuture<Map<TopicIdPartition, PartitionData>> future,
+ Map<TopicIdPartition, Integer> partitionMaxBytes,
+ int maxFetchRecords
+ ) {
+ this.fetchParams = fetchParams;
+ this.groupId = groupId;
+ this.memberId = memberId;
+ this.future = future;
+ this.partitionMaxBytes = partitionMaxBytes;
+ this.maxFetchRecords = maxFetchRecords;
+ }
+
+ public String groupId() {
+ return groupId;
+ }
+
+ public String memberId() {
+ return memberId;
+ }
+
+ public Map<TopicIdPartition, Integer> partitionMaxBytes() {
+ return partitionMaxBytes;
+ }
+
+ public FetchParams fetchParams() {
+ return fetchParams;
+ }
+
+ public int maxFetchRecords() {
+ return maxFetchRecords;
+ }
+
+ /**
+ * Add an erroneous partition to the share fetch request. If the erroneous
map is null, it will
+ * be created.
+ * <p>
+ * The method is synchronized to avoid concurrent modification of the
erroneous map, as for
+ * some partitions the pending initialization can be on some threads and
for other partitions
+ * share fetch request can be processed in purgatory.
+ *
+ * @param topicIdPartition The partition that had an error.
+ * @param throwable The error that occurred.
+ */
+ public synchronized void addErroneous(TopicIdPartition topicIdPartition,
Throwable throwable) {
+ if (erroneous == null) {
+ erroneous = new HashMap<>();
+ }
+ erroneous.put(topicIdPartition, throwable);
+ }
+
+ /**
+ * Check if the share fetch request is completed.
+ * @return true if the request is completed, false otherwise.
+ */
+ public boolean isCompleted() {
+ return future.isDone();
+ }
+
+ /**
+ * Check if all the partitions in the request have errored.
+ * @return true if all the partitions in the request have errored, false
otherwise.
+ */
+ public synchronized boolean errorInAllPartitions() {
+ return erroneous != null && erroneous.size() ==
partitionMaxBytes().size();
+ }
+
+ /**
+ * May be complete the share fetch request with the given partition data.
If the request is already completed,
+ * this method does nothing. If there are any erroneous partitions, they
will be added to the response.
+ *
+ * @param partitionData The partition data to complete the fetch with.
+ */
+ public void maybeComplete(Map<TopicIdPartition, PartitionData>
partitionData) {
+ if (isCompleted()) {
+ return;
+ }
+
+ Map<TopicIdPartition, PartitionData> response = new
HashMap<>(partitionData);
+ // Add any erroneous partitions to the response.
+ addErroneousToResponse(response);
+ future.complete(response);
+ }
+
+ /**
+ * Maybe complete the share fetch request with the given exception for the
topicIdPartitions.
+ * If the request is already completed, this method does nothing. If there
are any erroneous partitions,
+ * they will be added to the response.
+ *
+ * @param topicIdPartitions The topic id partitions which errored out.
+ * @param throwable The exception to complete the fetch with.
+ */
+ public void maybeCompleteWithException(Collection<TopicIdPartition>
topicIdPartitions, Throwable throwable) {
+ if (isCompleted()) {
+ return;
+ }
+ Map<TopicIdPartition, PartitionData> response =
topicIdPartitions.stream().collect(
+ Collectors.toMap(tp -> tp, tp -> new PartitionData()
+ .setErrorCode(Errors.forException(throwable).code())
+ .setErrorMessage(throwable.getMessage())));
+ // Add any erroneous partitions to the response.
+ addErroneousToResponse(response);
+ future.complete(response);
+ }
+
+ /**
+ * Filter out the erroneous partitions from the given set of
topicIdPartitions. The order of
+ * partitions is important hence the method expects an ordered set as
input and returns the ordered
+ * set as well.
+ *
+ * @param topicIdPartitions The topic id partitions to filter.
+ * @return The topic id partitions without the erroneous partitions.
+ */
+ public synchronized Set<TopicIdPartition>
filterErroneousTopicPartitions(Set<TopicIdPartition> topicIdPartitions) {
+ if (erroneous != null) {
+ Set<TopicIdPartition> retain = new
LinkedHashSet<>(topicIdPartitions);
+ retain.removeAll(erroneous.keySet());
+ return retain;
+ }
+ return topicIdPartitions;
+ }
+
+ private synchronized void addErroneousToResponse(Map<TopicIdPartition,
PartitionData> response) {
+ if (erroneous != null) {
+ erroneous.forEach((topicIdPartition, throwable) -> {
+ response.put(topicIdPartition, new PartitionData()
+ .setErrorCode(Errors.forException(throwable).code())
+ .setErrorMessage(throwable.getMessage()));
+ });
+ }
+ }
+}
diff --git
a/share/src/main/java/org/apache/kafka/server/share/fetch/ShareFetchData.java
b/share/src/main/java/org/apache/kafka/server/share/fetch/ShareFetchData.java
deleted file mode 100644
index c32b3280017..00000000000
---
a/share/src/main/java/org/apache/kafka/server/share/fetch/ShareFetchData.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.server.share.fetch;
-
-import org.apache.kafka.common.TopicIdPartition;
-import org.apache.kafka.common.message.ShareFetchResponseData.PartitionData;
-import org.apache.kafka.server.storage.log.FetchParams;
-
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-
-/**
- * The ShareFetchData class is used to store the fetch parameters for a share
fetch request.
- */
-public class ShareFetchData {
-
- private final FetchParams fetchParams;
- private final String groupId;
- private final String memberId;
- private final CompletableFuture<Map<TopicIdPartition, PartitionData>>
future;
- private final Map<TopicIdPartition, Integer> partitionMaxBytes;
- private final int maxFetchRecords;
-
- public ShareFetchData(
- FetchParams fetchParams,
- String groupId,
- String memberId,
- CompletableFuture<Map<TopicIdPartition, PartitionData>> future,
- Map<TopicIdPartition, Integer> partitionMaxBytes,
- int maxFetchRecords
- ) {
- this.fetchParams = fetchParams;
- this.groupId = groupId;
- this.memberId = memberId;
- this.future = future;
- this.partitionMaxBytes = partitionMaxBytes;
- this.maxFetchRecords = maxFetchRecords;
- }
-
- public String groupId() {
- return groupId;
- }
-
- public String memberId() {
- return memberId;
- }
-
- public CompletableFuture<Map<TopicIdPartition, PartitionData>> future() {
- return future;
- }
-
- public Map<TopicIdPartition, Integer> partitionMaxBytes() {
- return partitionMaxBytes;
- }
-
- public FetchParams fetchParams() {
- return fetchParams;
- }
-
- public int maxFetchRecords() {
- return maxFetchRecords;
- }
-}
diff --git
a/share/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTest.java
b/share/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTest.java
new file mode 100644
index 00000000000..bf5de1ae0de
--- /dev/null
+++
b/share/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.share.fetch;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.server.storage.log.FetchParams;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+public class ShareFetchTest {
+
+ private static final String GROUP_ID = "groupId";
+ private static final String MEMBER_ID = "memberId";
+
+ @Test
+ public void testErrorInAllPartitions() {
+ TopicIdPartition topicIdPartition = new
TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
+ ShareFetch shareFetch = new ShareFetch(mock(FetchParams.class),
GROUP_ID, MEMBER_ID, new CompletableFuture<>(),
+ Map.of(topicIdPartition, 10), 100);
+ assertFalse(shareFetch.errorInAllPartitions());
+
+ shareFetch.addErroneous(topicIdPartition, new RuntimeException());
+ assertTrue(shareFetch.errorInAllPartitions());
+ }
+
+ @Test
+ public void testErrorInAllPartitionsWithMultipleTopicIdPartitions() {
+ TopicIdPartition topicIdPartition0 = new
TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
+ TopicIdPartition topicIdPartition1 = new
TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1));
+ ShareFetch shareFetch = new ShareFetch(mock(FetchParams.class),
GROUP_ID, MEMBER_ID, new CompletableFuture<>(),
+ Map.of(topicIdPartition0, 10, topicIdPartition1, 10), 100);
+ assertFalse(shareFetch.errorInAllPartitions());
+
+ shareFetch.addErroneous(topicIdPartition0, new RuntimeException());
+ assertFalse(shareFetch.errorInAllPartitions());
+
+ shareFetch.addErroneous(topicIdPartition1, new RuntimeException());
+ assertTrue(shareFetch.errorInAllPartitions());
+ }
+
+ @Test
+ public void testFilterErroneousTopicPartitions() {
+ TopicIdPartition topicIdPartition0 = new
TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
+ TopicIdPartition topicIdPartition1 = new
TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1));
+ ShareFetch shareFetch = new ShareFetch(mock(FetchParams.class),
GROUP_ID, MEMBER_ID, new CompletableFuture<>(),
+ Map.of(topicIdPartition0, 10, topicIdPartition1, 10), 100);
+ Set<TopicIdPartition> result =
shareFetch.filterErroneousTopicPartitions(Set.of(topicIdPartition0,
topicIdPartition1));
+ // No erroneous partitions, hence all partitions should be returned.
+ assertEquals(2, result.size());
+ assertTrue(result.contains(topicIdPartition0));
+ assertTrue(result.contains(topicIdPartition1));
+
+ // Add an erroneous partition and verify that it is filtered out.
+ shareFetch.addErroneous(topicIdPartition0, new RuntimeException());
+ result =
shareFetch.filterErroneousTopicPartitions(Set.of(topicIdPartition0,
topicIdPartition1));
+ assertEquals(1, result.size());
+ assertTrue(result.contains(topicIdPartition1));
+
+ // Add another erroneous partition and verify that it is filtered out.
+ shareFetch.addErroneous(topicIdPartition1, new RuntimeException());
+ result =
shareFetch.filterErroneousTopicPartitions(Set.of(topicIdPartition0,
topicIdPartition1));
+ assertTrue(result.isEmpty());
+ }
+
+}