This is an automated email from the ASF dual-hosted git repository.
davidarthur pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6abbd548b86 MINOR Refactoring share fetch code (KIP-932) (#17269)
6abbd548b86 is described below
commit 6abbd548b86c61e8675bef8a4858144af21f4732
Author: Apoorv Mittal <[email protected]>
AuthorDate: Thu Sep 26 13:09:31 2024 +0100
MINOR Refactoring share fetch code (KIP-932) (#17269)
Reviewers: Andrew Schofield <[email protected]>, David Arthur
<[email protected]>
---
build.gradle | 1 +
checkstyle/import-control-share.xml | 4 +
.../java/kafka/server/share/DelayedShareFetch.java | 57 ++++-----
.../kafka/server/share/DelayedShareFetchKey.java | 8 +-
.../java/kafka/server/share/ShareFetchUtils.java | 14 ++-
.../java/kafka/server/share/SharePartition.java | 2 +-
.../kafka/server/share/SharePartitionManager.java | 128 ++++-----------------
core/src/main/scala/kafka/server/KafkaApis.scala | 3 +-
.../kafka/server/share/DelayedShareFetchTest.java | 66 +++++------
.../kafka/server/share/ShareFetchUtilsTest.java | 34 +++---
.../server/share/SharePartitionManagerTest.java | 120 +++++++++----------
.../kafka/server/share/SharePartitionTest.java | 2 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 3 +-
.../kafka/server/share/SharePartitionKey.java | 65 ++++++-----
.../ShareAcknowledgementBatch.java | 2 +-
.../kafka/server/share/fetch/ShareFetchData.java | 71 ++++++++++++
16 files changed, 297 insertions(+), 283 deletions(-)
diff --git a/build.gradle b/build.gradle
index 64b9d488365..01c8aae3caf 100644
--- a/build.gradle
+++ b/build.gradle
@@ -954,6 +954,7 @@ project(':share') {
dependencies {
implementation project(':server-common')
+ implementation project(':storage')
implementation libs.slf4jApi
diff --git a/checkstyle/import-control-share.xml
b/checkstyle/import-control-share.xml
index f0022d6b318..05ddfff90ac 100644
--- a/checkstyle/import-control-share.xml
+++ b/checkstyle/import-control-share.xml
@@ -43,6 +43,10 @@
<subpackage name="server">
<subpackage name="share">
<allow pkg="org.apache.kafka.server.share" />
+
+ <subpackage name="fetch">
+ <allow class="org.apache.kafka.storage.internals.log.FetchParams"/>
+ </subpackage>
</subpackage>
</subpackage>
diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java
b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
index ef80047f222..35608082684 100644
--- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java
+++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
@@ -23,6 +23,8 @@ import kafka.server.ReplicaManager;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.server.share.SharePartitionKey;
+import org.apache.kafka.server.share.fetch.ShareFetchData;
import org.apache.kafka.storage.internals.log.FetchPartitionData;
import org.slf4j.Logger;
@@ -46,19 +48,20 @@ import scala.runtime.BoxedUnit;
* A delayed share fetch operation has been introduced in case there is a
share fetch request which cannot be completed instantaneously.
*/
public class DelayedShareFetch extends DelayedOperation {
- private final SharePartitionManager.ShareFetchPartitionData
shareFetchPartitionData;
+
+ private final ShareFetchData shareFetchData;
private final ReplicaManager replicaManager;
- private final Map<SharePartitionManager.SharePartitionKey, SharePartition>
partitionCacheMap;
+ private final Map<SharePartitionKey, SharePartition> partitionCacheMap;
private Map<TopicIdPartition, FetchRequest.PartitionData>
topicPartitionDataFromTryComplete = new LinkedHashMap<>();
private static final Logger log =
LoggerFactory.getLogger(DelayedShareFetch.class);
DelayedShareFetch(
- SharePartitionManager.ShareFetchPartitionData
shareFetchPartitionData,
+ ShareFetchData shareFetchData,
ReplicaManager replicaManager,
- Map<SharePartitionManager.SharePartitionKey, SharePartition>
partitionCacheMap) {
- super(shareFetchPartitionData.fetchParams().maxWaitMs, Option.empty());
- this.shareFetchPartitionData = shareFetchPartitionData;
+ Map<SharePartitionKey, SharePartition> partitionCacheMap) {
+ super(shareFetchData.fetchParams().maxWaitMs, Option.empty());
+ this.shareFetchData = shareFetchData;
this.replicaManager = replicaManager;
this.partitionCacheMap = partitionCacheMap;
}
@@ -75,10 +78,10 @@ public class DelayedShareFetch extends DelayedOperation {
@Override
public void onComplete() {
log.trace("Completing the delayed share fetch request for group {},
member {}, " +
- "topic partitions {}",
shareFetchPartitionData.groupId(),
- shareFetchPartitionData.memberId(),
shareFetchPartitionData.partitionMaxBytes().keySet());
+ "topic partitions {}", shareFetchData.groupId(),
+ shareFetchData.memberId(),
shareFetchData.partitionMaxBytes().keySet());
- if (shareFetchPartitionData.future().isDone())
+ if (shareFetchData.future().isDone())
return;
Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData;
@@ -91,14 +94,14 @@ public class DelayedShareFetch extends DelayedOperation {
try {
if (topicPartitionData.isEmpty()) {
// No locks for share partitions could be acquired, so we
complete the request with an empty response.
-
shareFetchPartitionData.future().complete(Collections.emptyMap());
+ shareFetchData.future().complete(Collections.emptyMap());
return;
}
log.trace("Fetchable share partitions data: {} with groupId: {}
fetch params: {}",
- topicPartitionData, shareFetchPartitionData.groupId(),
shareFetchPartitionData.fetchParams());
+ topicPartitionData, shareFetchData.groupId(),
shareFetchData.fetchParams());
Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult =
replicaManager.readFromLog(
- shareFetchPartitionData.fetchParams(),
+ shareFetchData.fetchParams(),
CollectionConverters.asScala(
topicPartitionData.entrySet().stream().map(entry ->
new Tuple2<>(entry.getKey(),
entry.getValue())).collect(Collectors.toList())
@@ -116,23 +119,23 @@ public class DelayedShareFetch extends DelayedOperation {
});
log.trace("Data successfully retrieved by replica manager: {}",
responseData);
- ShareFetchUtils.processFetchResponse(shareFetchPartitionData,
responseData, partitionCacheMap, replicaManager)
+ ShareFetchUtils.processFetchResponse(shareFetchData, responseData,
partitionCacheMap, replicaManager)
.whenComplete((result, throwable) -> {
if (throwable != null) {
log.error("Error processing fetch response for share
partitions", throwable);
-
shareFetchPartitionData.future().completeExceptionally(throwable);
+
shareFetchData.future().completeExceptionally(throwable);
} else {
- shareFetchPartitionData.future().complete(result);
+ shareFetchData.future().complete(result);
}
// Releasing the lock to move ahead with the next request
in queue.
- releasePartitionLocks(shareFetchPartitionData.groupId(),
topicPartitionData.keySet());
+ releasePartitionLocks(shareFetchData.groupId(),
topicPartitionData.keySet());
});
} catch (Exception e) {
// Release the locks acquired for the partitions in the share
fetch request in case there is an exception
log.error("Error processing delayed share fetch request", e);
- shareFetchPartitionData.future().completeExceptionally(e);
- releasePartitionLocks(shareFetchPartitionData.groupId(),
topicPartitionData.keySet());
+ shareFetchData.future().completeExceptionally(e);
+ releasePartitionLocks(shareFetchData.groupId(),
topicPartitionData.keySet());
}
}
@@ -142,16 +145,16 @@ public class DelayedShareFetch extends DelayedOperation {
@Override
public boolean tryComplete() {
log.trace("Try to complete the delayed share fetch request for group
{}, member {}, topic partitions {}",
- shareFetchPartitionData.groupId(),
shareFetchPartitionData.memberId(),
- shareFetchPartitionData.partitionMaxBytes().keySet());
+ shareFetchData.groupId(), shareFetchData.memberId(),
+ shareFetchData.partitionMaxBytes().keySet());
topicPartitionDataFromTryComplete = acquirablePartitions();
if (!topicPartitionDataFromTryComplete.isEmpty())
return forceComplete();
log.info("Can't acquire records for any partition in the share fetch
request for group {}, member {}, " +
- "topic partitions {}", shareFetchPartitionData.groupId(),
- shareFetchPartitionData.memberId(),
shareFetchPartitionData.partitionMaxBytes().keySet());
+ "topic partitions {}", shareFetchData.groupId(),
+ shareFetchData.memberId(),
shareFetchData.partitionMaxBytes().keySet());
return false;
}
@@ -163,11 +166,11 @@ public class DelayedShareFetch extends DelayedOperation {
// Initialize the topic partitions for which the fetch should be
attempted.
Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData =
new LinkedHashMap<>();
-
shareFetchPartitionData.partitionMaxBytes().keySet().forEach(topicIdPartition
-> {
- SharePartition sharePartition = partitionCacheMap.get(new
SharePartitionManager.SharePartitionKey(
- shareFetchPartitionData.groupId(), topicIdPartition));
+ shareFetchData.partitionMaxBytes().keySet().forEach(topicIdPartition
-> {
+ SharePartition sharePartition = partitionCacheMap.get(new
SharePartitionKey(
+ shareFetchData.groupId(), topicIdPartition));
- int partitionMaxBytes =
shareFetchPartitionData.partitionMaxBytes().getOrDefault(topicIdPartition, 0);
+ int partitionMaxBytes =
shareFetchData.partitionMaxBytes().getOrDefault(topicIdPartition, 0);
// Add the share partition to the list of partitions to be fetched
only if we can
// acquire the fetch lock on it.
if (sharePartition.maybeAcquireFetchLock()) {
@@ -195,6 +198,6 @@ public class DelayedShareFetch extends DelayedOperation {
private void releasePartitionLocks(String groupId, Set<TopicIdPartition>
topicIdPartitions) {
topicIdPartitions.forEach(tp -> partitionCacheMap.get(new
- SharePartitionManager.SharePartitionKey(groupId,
tp)).releaseFetchLock());
+ SharePartitionKey(groupId, tp)).releaseFetchLock());
}
}
diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetchKey.java
b/core/src/main/java/kafka/server/share/DelayedShareFetchKey.java
index e4fe1a9af7f..a4c13f8f1a8 100644
--- a/core/src/main/java/kafka/server/share/DelayedShareFetchKey.java
+++ b/core/src/main/java/kafka/server/share/DelayedShareFetchKey.java
@@ -19,19 +19,17 @@ package kafka.server.share;
import kafka.server.DelayedOperationKey;
import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.server.share.SharePartitionKey;
import java.util.Objects;
/**
* A key for delayed operations that fetch data for share consumers.
*/
-public class DelayedShareFetchKey implements DelayedOperationKey {
- private final String groupId;
- private final TopicIdPartition topicIdPartition;
+public class DelayedShareFetchKey extends SharePartitionKey implements
DelayedOperationKey {
DelayedShareFetchKey(String groupId, TopicIdPartition topicIdPartition) {
- this.groupId = groupId;
- this.topicIdPartition = topicIdPartition;
+ super(groupId, topicIdPartition);
}
@Override
diff --git a/core/src/main/java/kafka/server/share/ShareFetchUtils.java
b/core/src/main/java/kafka/server/share/ShareFetchUtils.java
index b16156a62bc..a8f8a04dbd8 100644
--- a/core/src/main/java/kafka/server/share/ShareFetchUtils.java
+++ b/core/src/main/java/kafka/server/share/ShareFetchUtils.java
@@ -23,6 +23,8 @@ import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.server.share.SharePartitionKey;
+import org.apache.kafka.server.share.fetch.ShareFetchData;
import org.apache.kafka.storage.internals.log.FetchPartitionData;
import org.slf4j.Logger;
@@ -45,20 +47,20 @@ public class ShareFetchUtils {
// Process the replica manager fetch response to update share partitions
and futures. We acquire the fetched data
// from share partitions.
static CompletableFuture<Map<TopicIdPartition,
ShareFetchResponseData.PartitionData>> processFetchResponse(
- SharePartitionManager.ShareFetchPartitionData
shareFetchPartitionData,
+ ShareFetchData shareFetchData,
Map<TopicIdPartition, FetchPartitionData> responseData,
- Map<SharePartitionManager.SharePartitionKey, SharePartition>
partitionCacheMap,
+ Map<SharePartitionKey, SharePartition> partitionCacheMap,
ReplicaManager replicaManager
) {
Map<TopicIdPartition,
CompletableFuture<ShareFetchResponseData.PartitionData>> futures = new
HashMap<>();
responseData.forEach((topicIdPartition, fetchPartitionData) -> {
- SharePartition sharePartition = partitionCacheMap.get(new
SharePartitionManager.SharePartitionKey(
- shareFetchPartitionData.groupId(), topicIdPartition));
- futures.put(topicIdPartition,
sharePartition.acquire(shareFetchPartitionData.memberId(), fetchPartitionData)
+ SharePartition sharePartition = partitionCacheMap.get(new
SharePartitionKey(
+ shareFetchData.groupId(), topicIdPartition));
+ futures.put(topicIdPartition,
sharePartition.acquire(shareFetchData.memberId(), fetchPartitionData)
.handle((acquiredRecords, throwable) -> {
log.trace("Acquired records for topicIdPartition: {}
with share fetch data: {}, records: {}",
- topicIdPartition, shareFetchPartitionData,
acquiredRecords);
+ topicIdPartition, shareFetchData,
acquiredRecords);
ShareFetchResponseData.PartitionData partitionData =
new ShareFetchResponseData.PartitionData()
.setPartitionIndex(topicIdPartition.partition());
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java
b/core/src/main/java/kafka/server/share/SharePartition.java
index e8519939187..d2a8e783d7b 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -41,9 +41,9 @@ import org.apache.kafka.server.share.PartitionStateBatchData;
import org.apache.kafka.server.share.Persister;
import org.apache.kafka.server.share.PersisterStateBatch;
import org.apache.kafka.server.share.ReadShareGroupStateParameters;
-import org.apache.kafka.server.share.ShareAcknowledgementBatch;
import org.apache.kafka.server.share.TopicData;
import org.apache.kafka.server.share.WriteShareGroupStateParameters;
+import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.server.util.timer.TimerTask;
import org.apache.kafka.storage.internals.log.FetchPartitionData;
diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java
b/core/src/main/java/kafka/server/share/SharePartitionManager.java
index 3679f1c719a..28697a1441d 100644
--- a/core/src/main/java/kafka/server/share/SharePartitionManager.java
+++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java
@@ -40,10 +40,12 @@ import
org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.share.CachedSharePartition;
import org.apache.kafka.server.share.Persister;
-import org.apache.kafka.server.share.ShareAcknowledgementBatch;
+import org.apache.kafka.server.share.SharePartitionKey;
+import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
import org.apache.kafka.server.share.context.FinalContext;
import org.apache.kafka.server.share.context.ShareFetchContext;
import org.apache.kafka.server.share.context.ShareSessionContext;
+import org.apache.kafka.server.share.fetch.ShareFetchData;
import org.apache.kafka.server.share.session.ShareSession;
import org.apache.kafka.server.share.session.ShareSessionCache;
import org.apache.kafka.server.share.session.ShareSessionKey;
@@ -103,7 +105,7 @@ public class SharePartitionManager implements AutoCloseable
{
/**
* The fetch queue stores the share fetch requests that are waiting to be
processed.
*/
- private final ConcurrentLinkedQueue<ShareFetchPartitionData> fetchQueue;
+ private final ConcurrentLinkedQueue<ShareFetchData> fetchQueue;
/**
* The process fetch queue lock is used to ensure that only one thread is
processing the fetch queue at a time.
@@ -203,7 +205,7 @@ public class SharePartitionManager implements AutoCloseable
{
Time time,
ShareSessionCache cache,
Map<SharePartitionKey, SharePartition> partitionCacheMap,
- ConcurrentLinkedQueue<ShareFetchPartitionData> fetchQueue,
+ ConcurrentLinkedQueue<ShareFetchData> fetchQueue,
int recordLockDurationMs,
Timer timer,
int maxDeliveryCount,
@@ -248,8 +250,8 @@ public class SharePartitionManager implements AutoCloseable
{
partitionMaxBytes.keySet(), groupId, fetchParams);
CompletableFuture<Map<TopicIdPartition, PartitionData>> future = new
CompletableFuture<>();
- ShareFetchPartitionData shareFetchPartitionData = new
ShareFetchPartitionData(fetchParams, groupId, memberId, future,
partitionMaxBytes);
- fetchQueue.add(shareFetchPartitionData);
+ ShareFetchData shareFetchData = new ShareFetchData(fetchParams,
groupId, memberId, future, partitionMaxBytes);
+ fetchQueue.add(shareFetchData);
maybeProcessFetchQueue();
return future;
@@ -518,8 +520,9 @@ public class SharePartitionManager implements AutoCloseable
{
// Add the share fetch request to the delayed share fetch purgatory to
process the fetch request if it can be
// completed else watch until it can be completed/timeout.
- private void addDelayedShareFetch(DelayedShareFetch delayedShareFetch,
Set<Object> keys) {
- delayedShareFetchPurgatory.tryCompleteElseWatch(delayedShareFetch,
CollectionConverters.asScala(keys).toSeq());
+ private void addDelayedShareFetch(DelayedShareFetch delayedShareFetch,
Set<DelayedShareFetchKey> keys) {
+ delayedShareFetchPurgatory.tryCompleteElseWatch(delayedShareFetch,
+ CollectionConverters.asScala(keys).toSeq().indices());
}
@Override
@@ -529,7 +532,7 @@ public class SharePartitionManager implements AutoCloseable
{
this.persister.stop();
if (!fetchQueue.isEmpty()) {
log.warn("Closing SharePartitionManager with pending fetch
requests count: {}", fetchQueue.size());
- fetchQueue.forEach(shareFetchPartitionData ->
shareFetchPartitionData.future.completeExceptionally(
+ fetchQueue.forEach(shareFetchData ->
shareFetchData.future().completeExceptionally(
Errors.BROKER_NOT_AVAILABLE.exception()));
fetchQueue.clear();
}
@@ -553,17 +556,17 @@ public class SharePartitionManager implements
AutoCloseable {
return;
}
- ShareFetchPartitionData shareFetchPartitionData = fetchQueue.poll();
- if (shareFetchPartitionData == null) {
+ ShareFetchData shareFetchData = fetchQueue.poll();
+ if (shareFetchData == null) {
// No more requests to process, so release the lock. Though we
should not reach here as the lock
// is acquired only when there are requests in the queue. But
still, it's safe to release the lock.
releaseProcessFetchQueueLock();
return;
}
- if (shareFetchPartitionData.partitionMaxBytes.isEmpty()) {
+ if (shareFetchData.partitionMaxBytes().isEmpty()) {
// If there are no partitions to fetch then complete the future
with an empty map.
- shareFetchPartitionData.future.complete(Collections.emptyMap());
+ shareFetchData.future().complete(Collections.emptyMap());
// Release the lock so that other threads can process the queue.
releaseProcessFetchQueueLock();
if (!fetchQueue.isEmpty())
@@ -572,9 +575,9 @@ public class SharePartitionManager implements AutoCloseable
{
}
try {
-
shareFetchPartitionData.partitionMaxBytes.keySet().forEach(topicIdPartition -> {
+
shareFetchData.partitionMaxBytes().keySet().forEach(topicIdPartition -> {
SharePartitionKey sharePartitionKey = sharePartitionKey(
- shareFetchPartitionData.groupId,
+ shareFetchData.groupId(),
topicIdPartition
);
SharePartition sharePartition =
fetchSharePartition(sharePartitionKey);
@@ -585,19 +588,19 @@ public class SharePartitionManager implements
AutoCloseable {
// TopicPartitionData list will be populated only if the share
partition is already initialized.
sharePartition.maybeInitialize().whenComplete((result,
throwable) -> {
if (throwable != null) {
-
maybeCompleteInitializationWithException(sharePartitionKey,
shareFetchPartitionData.future, throwable);
+
maybeCompleteInitializationWithException(sharePartitionKey,
shareFetchData.future(), throwable);
return;
}
});
});
- Set<Object> delayedShareFetchWatchKeys = new HashSet<>();
- shareFetchPartitionData.partitionMaxBytes.keySet().forEach(
+ Set<DelayedShareFetchKey> delayedShareFetchWatchKeys = new
HashSet<>();
+ shareFetchData.partitionMaxBytes().keySet().forEach(
topicIdPartition -> delayedShareFetchWatchKeys.add(
- new DelayedShareFetchKey(shareFetchPartitionData.groupId,
topicIdPartition)));
+ new DelayedShareFetchKey(shareFetchData.groupId(),
topicIdPartition)));
// Add the share fetch to the delayed share fetch purgatory to
process the fetch request.
- addDelayedShareFetch(new
DelayedShareFetch(shareFetchPartitionData, replicaManager, partitionCacheMap),
+ addDelayedShareFetch(new DelayedShareFetch(shareFetchData,
replicaManager, partitionCacheMap),
delayedShareFetchWatchKeys);
// Release the lock so that other threads can process the queue.
@@ -621,8 +624,8 @@ public class SharePartitionManager implements AutoCloseable
{
k -> {
long start = time.hiResClockMs();
SharePartition partition = new SharePartition(
- sharePartitionKey.groupId,
- sharePartitionKey.topicIdPartition,
+ sharePartitionKey.groupId(),
+ sharePartitionKey.topicIdPartition(),
maxInFlightMessages,
maxDeliveryCount,
recordLockDurationMs,
@@ -679,89 +682,6 @@ public class SharePartitionManager implements
AutoCloseable {
return new SharePartitionKey(groupId, topicIdPartition);
}
- /**
- * The SharePartitionKey is used to uniquely identify a share partition.
The key is made up of the
- * share group id, the topic id and the partition id. The key is used to
store the SharePartition
- * objects in the partition cache map.
- */
- // Visible for testing
- static class SharePartitionKey {
- private final String groupId;
- private final TopicIdPartition topicIdPartition;
-
- public SharePartitionKey(String groupId, TopicIdPartition
topicIdPartition) {
- this.groupId = Objects.requireNonNull(groupId);
- this.topicIdPartition = Objects.requireNonNull(topicIdPartition);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(groupId, topicIdPartition);
- }
-
- @Override
- public boolean equals(final Object obj) {
- if (this == obj)
- return true;
- else if (obj == null || getClass() != obj.getClass())
- return false;
- else {
- SharePartitionKey that = (SharePartitionKey) obj;
- return groupId.equals(that.groupId) &&
Objects.equals(topicIdPartition, that.topicIdPartition);
- }
- }
-
- @Override
- public String toString() {
- return "SharePartitionKey{" +
- "groupId='" + groupId +
- ", topicIdPartition=" + topicIdPartition +
- '}';
- }
- }
-
- /**
- * The ShareFetchPartitionData class is used to store the fetch parameters
for a share fetch request.
- */
- // Visible for testing
- static class ShareFetchPartitionData {
- private final FetchParams fetchParams;
- private final String groupId;
- private final String memberId;
- private final CompletableFuture<Map<TopicIdPartition, PartitionData>>
future;
- private final Map<TopicIdPartition, Integer> partitionMaxBytes;
-
- public ShareFetchPartitionData(FetchParams fetchParams, String
groupId, String memberId,
- CompletableFuture<Map<TopicIdPartition,
PartitionData>> future,
- Map<TopicIdPartition, Integer>
partitionMaxBytes) {
- this.fetchParams = fetchParams;
- this.groupId = groupId;
- this.memberId = memberId;
- this.future = future;
- this.partitionMaxBytes = partitionMaxBytes;
- }
-
- public String groupId() {
- return groupId;
- }
-
- public String memberId() {
- return memberId;
- }
-
- public CompletableFuture<Map<TopicIdPartition, PartitionData>>
future() {
- return future;
- }
-
- public Map<TopicIdPartition, Integer> partitionMaxBytes() {
- return partitionMaxBytes;
- }
-
- public FetchParams fetchParams() {
- return fetchParams;
- }
- }
-
static class ShareGroupMetrics {
/**
* share-acknowledgement (share-acknowledgement-rate and
share-acknowledgement-count) - The total number of offsets acknowledged for
share groups (requests to be ack).
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 29f1b270ebc..2586c562cf5 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -77,7 +77,8 @@ import org.apache.kafka.server.common.{GroupVersion,
MetadataVersion, RequestLoc
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_11_0_IV0,
IBP_2_3_IV0}
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.server.share.context.ShareFetchContext
-import org.apache.kafka.server.share.{ErroneousAndValidPartitionData,
ShareAcknowledgementBatch}
+import org.apache.kafka.server.share.ErroneousAndValidPartitionData
+import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchIsolation,
FetchParams, FetchPartitionData}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
index cc490c0bd97..85f50c0f38a 100644
--- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
+++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
@@ -25,6 +25,8 @@ import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.server.share.SharePartitionKey;
+import org.apache.kafka.server.share.fetch.ShareFetchData;
import org.apache.kafka.storage.internals.log.FetchIsolation;
import org.apache.kafka.storage.internals.log.FetchParams;
@@ -68,11 +70,11 @@ public class DelayedShareFetchTest {
when(sp0.maybeAcquireFetchLock()).thenReturn(true);
when(sp1.maybeAcquireFetchLock()).thenReturn(true);
- Map<SharePartitionManager.SharePartitionKey, SharePartition>
partitionCacheMap = new ConcurrentHashMap<>();
- partitionCacheMap.put(new
SharePartitionManager.SharePartitionKey(groupId, tp0), sp0);
- partitionCacheMap.put(new
SharePartitionManager.SharePartitionKey(groupId, tp1), sp1);
+ Map<SharePartitionKey, SharePartition> partitionCacheMap = new
ConcurrentHashMap<>();
+ partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
+ partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
- SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData
= new SharePartitionManager.ShareFetchPartitionData(
+ ShareFetchData shareFetchData = new ShareFetchData(
new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(),
FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK,
Optional.empty()), groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes);
@@ -80,7 +82,7 @@ public class DelayedShareFetchTest {
when(sp0.canAcquireRecords()).thenReturn(false);
when(sp1.canAcquireRecords()).thenReturn(false);
DelayedShareFetch delayedShareFetch =
DelayedShareFetchBuilder.builder()
- .withShareFetchPartitionData(shareFetchPartitionData)
+ .withShareFetchData(shareFetchData)
.withPartitionCacheMap(partitionCacheMap)
.build();
@@ -105,11 +107,11 @@ public class DelayedShareFetchTest {
when(sp0.maybeAcquireFetchLock()).thenReturn(true);
when(sp1.maybeAcquireFetchLock()).thenReturn(true);
- Map<SharePartitionManager.SharePartitionKey, SharePartition>
partitionCacheMap = new ConcurrentHashMap<>();
- partitionCacheMap.put(new
SharePartitionManager.SharePartitionKey(groupId, tp0), sp0);
- partitionCacheMap.put(new
SharePartitionManager.SharePartitionKey(groupId, tp1), sp1);
+ Map<SharePartitionKey, SharePartition> partitionCacheMap = new
ConcurrentHashMap<>();
+ partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
+ partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
- SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData
= new SharePartitionManager.ShareFetchPartitionData(
+ ShareFetchData shareFetchData = new ShareFetchData(
new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(),
FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK,
Optional.empty()), groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes);
@@ -117,7 +119,7 @@ public class DelayedShareFetchTest {
when(sp0.canAcquireRecords()).thenReturn(true);
when(sp1.canAcquireRecords()).thenReturn(false);
DelayedShareFetch delayedShareFetch =
DelayedShareFetchBuilder.builder()
- .withShareFetchPartitionData(shareFetchPartitionData)
+ .withShareFetchData(shareFetchData)
.withPartitionCacheMap(partitionCacheMap)
.build();
assertFalse(delayedShareFetch.isCompleted());
@@ -144,11 +146,11 @@ public class DelayedShareFetchTest {
when(sp0.maybeAcquireFetchLock()).thenReturn(true);
when(sp1.maybeAcquireFetchLock()).thenReturn(true);
- Map<SharePartitionManager.SharePartitionKey, SharePartition>
partitionCacheMap = new ConcurrentHashMap<>();
- partitionCacheMap.put(new
SharePartitionManager.SharePartitionKey(groupId, tp0), sp0);
- partitionCacheMap.put(new
SharePartitionManager.SharePartitionKey(groupId, tp1), sp1);
+ Map<SharePartitionKey, SharePartition> partitionCacheMap = new
ConcurrentHashMap<>();
+ partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
+ partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
- SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData
= new SharePartitionManager.ShareFetchPartitionData(
+ ShareFetchData shareFetchData = new ShareFetchData(
new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(),
FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK,
Optional.empty()), groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes);
@@ -156,7 +158,7 @@ public class DelayedShareFetchTest {
when(sp0.canAcquireRecords()).thenReturn(false);
when(sp1.canAcquireRecords()).thenReturn(false);
DelayedShareFetch delayedShareFetch =
DelayedShareFetchBuilder.builder()
- .withShareFetchPartitionData(shareFetchPartitionData)
+ .withShareFetchData(shareFetchData)
.withReplicaManager(replicaManager)
.withPartitionCacheMap(partitionCacheMap)
.build();
@@ -164,7 +166,7 @@ public class DelayedShareFetchTest {
delayedShareFetch.forceComplete();
// Since no partition could be acquired, the future should be empty
and replicaManager.readFromLog should not be called.
- assertEquals(0, shareFetchPartitionData.future().join().size());
+ assertEquals(0, shareFetchData.future().join().size());
Mockito.verify(replicaManager, times(0)).readFromLog(
any(), any(), any(ReplicaQuota.class), anyBoolean());
assertTrue(delayedShareFetch.isCompleted());
@@ -187,11 +189,11 @@ public class DelayedShareFetchTest {
when(sp0.maybeAcquireFetchLock()).thenReturn(true);
when(sp1.maybeAcquireFetchLock()).thenReturn(true);
- Map<SharePartitionManager.SharePartitionKey, SharePartition>
partitionCacheMap = new ConcurrentHashMap<>();
- partitionCacheMap.put(new
SharePartitionManager.SharePartitionKey(groupId, tp0), sp0);
- partitionCacheMap.put(new
SharePartitionManager.SharePartitionKey(groupId, tp1), sp1);
+ Map<SharePartitionKey, SharePartition> partitionCacheMap = new
ConcurrentHashMap<>();
+ partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
+ partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
- SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData
= new SharePartitionManager.ShareFetchPartitionData(
+ ShareFetchData shareFetchData = new ShareFetchData(
new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(),
FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK,
Optional.empty()), groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes);
@@ -199,7 +201,7 @@ public class DelayedShareFetchTest {
when(sp0.canAcquireRecords()).thenReturn(true);
when(sp1.canAcquireRecords()).thenReturn(false);
DelayedShareFetch delayedShareFetch =
DelayedShareFetchBuilder.builder()
- .withShareFetchPartitionData(shareFetchPartitionData)
+ .withShareFetchData(shareFetchData)
.withReplicaManager(replicaManager)
.withPartitionCacheMap(partitionCacheMap)
.build();
@@ -225,11 +227,11 @@ public class DelayedShareFetchTest {
SharePartition sp0 = mock(SharePartition.class);
- Map<SharePartitionManager.SharePartitionKey, SharePartition>
partitionCacheMap = new ConcurrentHashMap<>();
- partitionCacheMap.put(new
SharePartitionManager.SharePartitionKey(groupId, tp0), sp0);
+ Map<SharePartitionKey, SharePartition> partitionCacheMap = new
ConcurrentHashMap<>();
+ partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
CompletableFuture<Map<TopicIdPartition,
ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
- SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData
= new SharePartitionManager.ShareFetchPartitionData(
+ ShareFetchData shareFetchData = new ShareFetchData(
new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(),
FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK,
Optional.empty()), groupId, Uuid.randomUuid().toString(),
future, partitionMaxBytes);
@@ -238,7 +240,7 @@ public class DelayedShareFetchTest {
when(sp0.canAcquireRecords()).thenReturn(false);
DelayedShareFetch delayedShareFetch =
spy(DelayedShareFetchBuilder.builder()
- .withShareFetchPartitionData(shareFetchPartitionData)
+ .withShareFetchData(shareFetchData)
.withReplicaManager(replicaManager)
.withPartitionCacheMap(partitionCacheMap)
.build());
@@ -249,7 +251,7 @@ public class DelayedShareFetchTest {
assertTrue(delayedShareFetch.isCompleted());
// Verifying that the first forceComplete calls acquirablePartitions
method in DelayedShareFetch.
Mockito.verify(delayedShareFetch, times(1)).acquirablePartitions();
- assertEquals(0, shareFetchPartitionData.future().join().size());
+ assertEquals(0, shareFetchData.future().join().size());
// Force completing the share fetch request for the second time should
hit the future completion check and not
// proceed ahead in the function.
@@ -260,12 +262,12 @@ public class DelayedShareFetchTest {
}
static class DelayedShareFetchBuilder {
- SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData
= mock(SharePartitionManager.ShareFetchPartitionData.class);
+ ShareFetchData shareFetchData = mock(ShareFetchData.class);
private ReplicaManager replicaManager = mock(ReplicaManager.class);
- private Map<SharePartitionManager.SharePartitionKey, SharePartition>
partitionCacheMap = new HashMap<>();
+ private Map<SharePartitionKey, SharePartition> partitionCacheMap = new
HashMap<>();
- DelayedShareFetchBuilder
withShareFetchPartitionData(SharePartitionManager.ShareFetchPartitionData
shareFetchPartitionData) {
- this.shareFetchPartitionData = shareFetchPartitionData;
+ DelayedShareFetchBuilder withShareFetchData(ShareFetchData
shareFetchData) {
+ this.shareFetchData = shareFetchData;
return this;
}
@@ -274,7 +276,7 @@ public class DelayedShareFetchTest {
return this;
}
- DelayedShareFetchBuilder
withPartitionCacheMap(Map<SharePartitionManager.SharePartitionKey,
SharePartition> partitionCacheMap) {
+ DelayedShareFetchBuilder withPartitionCacheMap(Map<SharePartitionKey,
SharePartition> partitionCacheMap) {
this.partitionCacheMap = partitionCacheMap;
return this;
}
@@ -285,7 +287,7 @@ public class DelayedShareFetchTest {
public DelayedShareFetch build() {
return new DelayedShareFetch(
- shareFetchPartitionData,
+ shareFetchData,
replicaManager,
partitionCacheMap);
}
diff --git a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
index dad92125b24..0f4d1c64ccb 100644
--- a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
+++ b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
@@ -30,6 +30,8 @@ import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.server.share.SharePartitionKey;
+import org.apache.kafka.server.share.fetch.ShareFetchData;
import org.apache.kafka.storage.internals.log.FetchIsolation;
import org.apache.kafka.storage.internals.log.FetchParams;
import org.apache.kafka.storage.internals.log.FetchPartitionData;
@@ -90,11 +92,11 @@ public class ShareFetchUtilsTest {
doNothing().when(sp1).updateCacheAndOffsets(any(Long.class));
doNothing().when(sp0).updateCacheAndOffsets(any(Long.class));
- Map<SharePartitionManager.SharePartitionKey, SharePartition>
partitionCacheMap = new ConcurrentHashMap<>();
- partitionCacheMap.put(new
SharePartitionManager.SharePartitionKey(groupId, tp0), sp0);
- partitionCacheMap.put(new
SharePartitionManager.SharePartitionKey(groupId, tp1), sp1);
+ Map<SharePartitionKey, SharePartition> partitionCacheMap = new
ConcurrentHashMap<>();
+ partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
+ partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
- SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData
= new SharePartitionManager.ShareFetchPartitionData(
+ ShareFetchData shareFetchData = new ShareFetchData(
new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(),
FetchRequest.ORDINARY_CONSUMER_ID, -1, 0,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK,
Optional.empty()), groupId, memberId,
new CompletableFuture<>(), partitionMaxBytes);
@@ -119,7 +121,7 @@ public class ShareFetchUtilsTest {
records1, Optional.empty(), OptionalLong.empty(),
Optional.empty(),
OptionalInt.empty(), false));
CompletableFuture<Map<TopicIdPartition,
ShareFetchResponseData.PartitionData>> result =
- ShareFetchUtils.processFetchResponse(shareFetchPartitionData,
responseData, partitionCacheMap, mock(ReplicaManager.class));
+ ShareFetchUtils.processFetchResponse(shareFetchData,
responseData, partitionCacheMap, mock(ReplicaManager.class));
assertTrue(result.isDone());
Map<TopicIdPartition, ShareFetchResponseData.PartitionData> resultData
= result.join();
@@ -161,11 +163,11 @@ public class ShareFetchUtilsTest {
doNothing().when(sp1).updateCacheAndOffsets(any(Long.class));
doNothing().when(sp0).updateCacheAndOffsets(any(Long.class));
- Map<SharePartitionManager.SharePartitionKey, SharePartition>
partitionCacheMap = new ConcurrentHashMap<>();
- partitionCacheMap.put(new
SharePartitionManager.SharePartitionKey(groupId, tp0), sp0);
- partitionCacheMap.put(new
SharePartitionManager.SharePartitionKey(groupId, tp1), sp1);
+ Map<SharePartitionKey, SharePartition> partitionCacheMap = new
ConcurrentHashMap<>();
+ partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
+ partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
- SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData
= new SharePartitionManager.ShareFetchPartitionData(
+ ShareFetchData shareFetchData = new ShareFetchData(
new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(),
FetchRequest.ORDINARY_CONSUMER_ID, -1, 0,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK,
Optional.empty()), groupId, memberId,
new CompletableFuture<>(), partitionMaxBytes);
@@ -178,7 +180,7 @@ public class ShareFetchUtilsTest {
MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(),
Optional.empty(),
OptionalInt.empty(), false));
CompletableFuture<Map<TopicIdPartition,
ShareFetchResponseData.PartitionData>> result =
- ShareFetchUtils.processFetchResponse(shareFetchPartitionData,
responseData, partitionCacheMap, mock(ReplicaManager.class));
+ ShareFetchUtils.processFetchResponse(shareFetchData,
responseData, partitionCacheMap, mock(ReplicaManager.class));
assertTrue(result.isDone());
Map<TopicIdPartition, ShareFetchResponseData.PartitionData> resultData
= result.join();
@@ -207,11 +209,11 @@ public class ShareFetchUtilsTest {
SharePartition sp0 = Mockito.mock(SharePartition.class);
SharePartition sp1 = Mockito.mock(SharePartition.class);
- Map<SharePartitionManager.SharePartitionKey, SharePartition>
partitionCacheMap = new ConcurrentHashMap<>();
- partitionCacheMap.put(new
SharePartitionManager.SharePartitionKey(groupId, tp0), sp0);
- partitionCacheMap.put(new
SharePartitionManager.SharePartitionKey(groupId, tp1), sp1);
+ Map<SharePartitionKey, SharePartition> partitionCacheMap = new
ConcurrentHashMap<>();
+ partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
+ partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
- SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData
= new SharePartitionManager.ShareFetchPartitionData(
+ ShareFetchData shareFetchData = new ShareFetchData(
new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(),
FetchRequest.ORDINARY_CONSUMER_ID, -1, 0,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK,
Optional.empty()),
groupId, Uuid.randomUuid().toString(), new
CompletableFuture<>(), partitionMaxBytes);
@@ -251,7 +253,7 @@ public class ShareFetchUtilsTest {
records1, Optional.empty(), OptionalLong.empty(),
Optional.empty(),
OptionalInt.empty(), false));
CompletableFuture<Map<TopicIdPartition,
ShareFetchResponseData.PartitionData>> result1 =
- ShareFetchUtils.processFetchResponse(shareFetchPartitionData,
responseData1, partitionCacheMap, replicaManager);
+ ShareFetchUtils.processFetchResponse(shareFetchData,
responseData1, partitionCacheMap, replicaManager);
assertTrue(result1.isDone());
Map<TopicIdPartition, ShareFetchResponseData.PartitionData>
resultData1 = result1.join();
@@ -282,7 +284,7 @@ public class ShareFetchUtilsTest {
MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(),
Optional.empty(),
OptionalInt.empty(), false));
CompletableFuture<Map<TopicIdPartition,
ShareFetchResponseData.PartitionData>> result2 =
- ShareFetchUtils.processFetchResponse(shareFetchPartitionData,
responseData2, partitionCacheMap, replicaManager);
+ ShareFetchUtils.processFetchResponse(shareFetchData,
responseData2, partitionCacheMap, replicaManager);
assertTrue(result2.isDone());
Map<TopicIdPartition, ShareFetchResponseData.PartitionData>
resultData2 = result2.join();
diff --git
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index 861233cda9c..9529c678301 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -53,10 +53,12 @@ import org.apache.kafka.server.share.CachedSharePartition;
import org.apache.kafka.server.share.ErroneousAndValidPartitionData;
import org.apache.kafka.server.share.NoOpShareStatePersister;
import org.apache.kafka.server.share.Persister;
-import org.apache.kafka.server.share.ShareAcknowledgementBatch;
+import org.apache.kafka.server.share.SharePartitionKey;
+import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
import org.apache.kafka.server.share.context.FinalContext;
import org.apache.kafka.server.share.context.ShareFetchContext;
import org.apache.kafka.server.share.context.ShareSessionContext;
+import org.apache.kafka.server.share.fetch.ShareFetchData;
import org.apache.kafka.server.share.session.ShareSession;
import org.apache.kafka.server.share.session.ShareSessionCache;
import org.apache.kafka.server.share.session.ShareSessionKey;
@@ -978,17 +980,17 @@ public class SharePartitionManagerTest {
@Test
public void testSharePartitionKey() {
- SharePartitionManager.SharePartitionKey sharePartitionKey1 = new
SharePartitionManager.SharePartitionKey("mock-group-1",
+ SharePartitionKey sharePartitionKey1 = new
SharePartitionKey("mock-group-1",
new TopicIdPartition(new Uuid(0L, 1L), new
TopicPartition("test", 0)));
- SharePartitionManager.SharePartitionKey sharePartitionKey2 = new
SharePartitionManager.SharePartitionKey("mock-group-2",
+ SharePartitionKey sharePartitionKey2 = new
SharePartitionKey("mock-group-2",
new TopicIdPartition(new Uuid(0L, 1L), new
TopicPartition("test", 0)));
- SharePartitionManager.SharePartitionKey sharePartitionKey3 = new
SharePartitionManager.SharePartitionKey("mock-group-1",
+ SharePartitionKey sharePartitionKey3 = new
SharePartitionKey("mock-group-1",
new TopicIdPartition(new Uuid(1L, 1L), new
TopicPartition("test-1", 0)));
- SharePartitionManager.SharePartitionKey sharePartitionKey4 = new
SharePartitionManager.SharePartitionKey("mock-group-1",
+ SharePartitionKey sharePartitionKey4 = new
SharePartitionKey("mock-group-1",
new TopicIdPartition(new Uuid(0L, 1L), new
TopicPartition("test", 1)));
- SharePartitionManager.SharePartitionKey sharePartitionKey5 = new
SharePartitionManager.SharePartitionKey("mock-group-1",
+ SharePartitionKey sharePartitionKey5 = new
SharePartitionKey("mock-group-1",
new TopicIdPartition(new Uuid(0L, 0L), new
TopicPartition("test-2", 0)));
- SharePartitionManager.SharePartitionKey sharePartitionKey1Copy = new
SharePartitionManager.SharePartitionKey("mock-group-1",
+ SharePartitionKey sharePartitionKey1Copy = new
SharePartitionKey("mock-group-1",
new TopicIdPartition(new Uuid(0L, 1L), new
TopicPartition("test", 0)));
assertEquals(sharePartitionKey1, sharePartitionKey1Copy);
@@ -1182,8 +1184,8 @@ public class SharePartitionManagerTest {
when(sp0.maybeAcquireFetchLock()).thenReturn(true);
when(sp0.canAcquireRecords()).thenReturn(false);
when(sp0.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
- Map<SharePartitionManager.SharePartitionKey, SharePartition>
partitionCacheMap = new HashMap<>();
- partitionCacheMap.put(new
SharePartitionManager.SharePartitionKey(groupId, tp0), sp0);
+ Map<SharePartitionKey, SharePartition> partitionCacheMap = new
HashMap<>();
+ partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
DelayedOperationPurgatory<DelayedShareFetch>
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer, replicaManager.localBrokerId(),
@@ -1305,9 +1307,9 @@ public class SharePartitionManagerTest {
partitionMap.add(new CachedSharePartition(tp3));
when(shareSession.partitionMap()).thenReturn(partitionMap);
- Map<SharePartitionManager.SharePartitionKey, SharePartition>
partitionCacheMap = new HashMap<>();
- partitionCacheMap.put(new
SharePartitionManager.SharePartitionKey(groupId, tp1), sp1);
- partitionCacheMap.put(new
SharePartitionManager.SharePartitionKey(groupId, tp2), sp2);
+ Map<SharePartitionKey, SharePartition> partitionCacheMap = new
HashMap<>();
+ partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
+ partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2);
SharePartitionManager sharePartitionManager =
SharePartitionManagerBuilder.builder()
.withCache(cache)
@@ -1439,8 +1441,8 @@ public class SharePartitionManagerTest {
when(sp.acknowledge(ArgumentMatchers.eq(memberId),
any())).thenReturn(CompletableFuture.completedFuture(null));
- Map<SharePartitionManager.SharePartitionKey, SharePartition>
partitionCacheMap = new HashMap<>();
- partitionCacheMap.put(new
SharePartitionManager.SharePartitionKey(groupId, tp), sp);
+ Map<SharePartitionKey, SharePartition> partitionCacheMap = new
HashMap<>();
+ partitionCacheMap.put(new SharePartitionKey(groupId, tp), sp);
SharePartitionManager sharePartitionManager =
SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap).build();
@@ -1475,10 +1477,10 @@ public class SharePartitionManagerTest {
when(sp2.acknowledge(ArgumentMatchers.eq(memberId),
any())).thenReturn(CompletableFuture.completedFuture(null));
when(sp3.acknowledge(ArgumentMatchers.eq(memberId),
any())).thenReturn(CompletableFuture.completedFuture(null));
- Map<SharePartitionManager.SharePartitionKey, SharePartition>
partitionCacheMap = new HashMap<>();
- partitionCacheMap.put(new
SharePartitionManager.SharePartitionKey(groupId, tp1), sp1);
- partitionCacheMap.put(new
SharePartitionManager.SharePartitionKey(groupId, tp2), sp2);
- partitionCacheMap.put(new
SharePartitionManager.SharePartitionKey(groupId, tp3), sp3);
+ Map<SharePartitionKey, SharePartition> partitionCacheMap = new
HashMap<>();
+ partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
+ partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2);
+ partitionCacheMap.put(new SharePartitionKey(groupId, tp3), sp3);
Metrics metrics = new Metrics();
SharePartitionManager sharePartitionManager =
SharePartitionManagerBuilder.builder()
@@ -1565,8 +1567,8 @@ public class SharePartitionManagerTest {
TopicIdPartition tp = new TopicIdPartition(Uuid.randomUuid(), new
TopicPartition("foo", 0));
SharePartition sp = mock(SharePartition.class);
- Map<SharePartitionManager.SharePartitionKey, SharePartition>
partitionCacheMap = new HashMap<>();
- partitionCacheMap.put(new
SharePartitionManager.SharePartitionKey(groupId, tp), sp);
+ Map<SharePartitionKey, SharePartition> partitionCacheMap = new
HashMap<>();
+ partitionCacheMap.put(new SharePartitionKey(groupId, tp), sp);
SharePartitionManager sharePartitionManager =
SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap).build();
@@ -1594,8 +1596,8 @@ public class SharePartitionManagerTest {
when(sp.acknowledge(ArgumentMatchers.eq(memberId),
any())).thenReturn(FutureUtils.failedFuture(
new InvalidRequestException("Member is not the owner of batch
record")
));
- Map<SharePartitionManager.SharePartitionKey, SharePartition>
partitionCacheMap = new HashMap<>();
- partitionCacheMap.put(new
SharePartitionManager.SharePartitionKey(groupId, tp), sp);
+ Map<SharePartitionKey, SharePartition> partitionCacheMap = new
HashMap<>();
+ partitionCacheMap.put(new SharePartitionKey(groupId, tp), sp);
SharePartitionManager sharePartitionManager =
SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap).build();
@@ -1649,16 +1651,16 @@ public class SharePartitionManagerTest {
final Time time = new MockTime();
ReplicaManager replicaManager = mock(ReplicaManager.class);
- SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData1
= new SharePartitionManager.ShareFetchPartitionData(
+ ShareFetchData shareFetchData1 = new ShareFetchData(
fetchParams, groupId, memberId, new CompletableFuture<>(),
Collections.emptyMap());
- SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData2
= new SharePartitionManager.ShareFetchPartitionData(
+ ShareFetchData shareFetchData2 = new ShareFetchData(
fetchParams, groupId, memberId, new CompletableFuture<>(),
partitionMaxBytes);
- ConcurrentLinkedQueue<SharePartitionManager.ShareFetchPartitionData>
fetchQueue = new ConcurrentLinkedQueue<>();
+ ConcurrentLinkedQueue<ShareFetchData> fetchQueue = new
ConcurrentLinkedQueue<>();
// First request added to fetch queue is empty i.e. no topic
partitions to fetch.
- fetchQueue.add(shareFetchPartitionData1);
+ fetchQueue.add(shareFetchData1);
// Second request added to fetch queue has a topic partition to fetch.
- fetchQueue.add(shareFetchPartitionData2);
+ fetchQueue.add(shareFetchData2);
DelayedOperationPurgatory<DelayedShareFetch>
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer, replicaManager.localBrokerId(),
@@ -1704,11 +1706,11 @@ public class SharePartitionManagerTest {
return CompletableFuture.completedFuture(Optional.empty());
}).when(sp2).acknowledge(ArgumentMatchers.eq(memberId), any());
- Map<SharePartitionManager.SharePartitionKey, SharePartition>
partitionCacheMap = new HashMap<>();
- partitionCacheMap.put(new
SharePartitionManager.SharePartitionKey(groupId, tp1), sp1);
- partitionCacheMap.put(new
SharePartitionManager.SharePartitionKey(groupId, tp2), sp2);
+ Map<SharePartitionKey, SharePartition> partitionCacheMap = new
HashMap<>();
+ partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
+ partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2);
- SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData
= new SharePartitionManager.ShareFetchPartitionData(
+ ShareFetchData shareFetchData = new ShareFetchData(
new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(),
FetchRequest.ORDINARY_CONSUMER_ID, -1, DELAYED_SHARE_FETCH_MAX_WAIT_MS,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK,
Optional.empty()),
groupId,
@@ -1731,7 +1733,7 @@ public class SharePartitionManagerTest {
partitionMaxBytes.keySet().forEach(topicIdPartition ->
delayedShareFetchWatchKeys.add(new DelayedShareFetchKey(groupId,
topicIdPartition)));
DelayedShareFetch delayedShareFetch =
DelayedShareFetchTest.DelayedShareFetchBuilder.builder()
- .withShareFetchPartitionData(shareFetchPartitionData)
+ .withShareFetchData(shareFetchData)
.withReplicaManager(replicaManager)
.withPartitionCacheMap(partitionCacheMap)
.build();
@@ -1799,12 +1801,12 @@ public class SharePartitionManagerTest {
return CompletableFuture.completedFuture(Optional.empty());
}).when(sp3).acknowledge(ArgumentMatchers.eq(memberId), any());
- Map<SharePartitionManager.SharePartitionKey, SharePartition>
partitionCacheMap = new HashMap<>();
- partitionCacheMap.put(new
SharePartitionManager.SharePartitionKey(groupId, tp1), sp1);
- partitionCacheMap.put(new
SharePartitionManager.SharePartitionKey(groupId, tp2), sp2);
- partitionCacheMap.put(new
SharePartitionManager.SharePartitionKey(groupId, tp3), sp3);
+ Map<SharePartitionKey, SharePartition> partitionCacheMap = new
HashMap<>();
+ partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
+ partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2);
+ partitionCacheMap.put(new SharePartitionKey(groupId, tp3), sp3);
- SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData
= new SharePartitionManager.ShareFetchPartitionData(
+ ShareFetchData shareFetchData = new ShareFetchData(
new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(),
FetchRequest.ORDINARY_CONSUMER_ID, -1, DELAYED_SHARE_FETCH_MAX_WAIT_MS,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK,
Optional.empty()),
groupId,
@@ -1829,7 +1831,7 @@ public class SharePartitionManagerTest {
partitionMaxBytes.keySet().forEach(topicIdPartition ->
delayedShareFetchWatchKeys.add(new DelayedShareFetchKey(groupId,
topicIdPartition)));
DelayedShareFetch delayedShareFetch =
DelayedShareFetchTest.DelayedShareFetchBuilder.builder()
- .withShareFetchPartitionData(shareFetchPartitionData)
+ .withShareFetchData(shareFetchData)
.withReplicaManager(replicaManager)
.withPartitionCacheMap(partitionCacheMap)
.build();
@@ -1893,11 +1895,11 @@ public class SharePartitionManagerTest {
return CompletableFuture.completedFuture(Optional.empty());
}).when(sp2).releaseAcquiredRecords(ArgumentMatchers.eq(memberId));
- Map<SharePartitionManager.SharePartitionKey, SharePartition>
partitionCacheMap = new HashMap<>();
- partitionCacheMap.put(new
SharePartitionManager.SharePartitionKey(groupId, tp1), sp1);
- partitionCacheMap.put(new
SharePartitionManager.SharePartitionKey(groupId, tp2), sp2);
+ Map<SharePartitionKey, SharePartition> partitionCacheMap = new
HashMap<>();
+ partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
+ partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2);
- SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData
= new SharePartitionManager.ShareFetchPartitionData(
+ ShareFetchData shareFetchData = new ShareFetchData(
new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(),
FetchRequest.ORDINARY_CONSUMER_ID, -1, DELAYED_SHARE_FETCH_MAX_WAIT_MS,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK,
Optional.empty()),
groupId,
@@ -1920,7 +1922,7 @@ public class SharePartitionManagerTest {
partitionMaxBytes.keySet().forEach(topicIdPartition ->
delayedShareFetchWatchKeys.add(new DelayedShareFetchKey(groupId,
topicIdPartition)));
DelayedShareFetch delayedShareFetch =
DelayedShareFetchTest.DelayedShareFetchBuilder.builder()
- .withShareFetchPartitionData(shareFetchPartitionData)
+ .withShareFetchData(shareFetchData)
.withReplicaManager(replicaManager)
.withPartitionCacheMap(partitionCacheMap)
.build();
@@ -1992,12 +1994,12 @@ public class SharePartitionManagerTest {
return CompletableFuture.completedFuture(Optional.empty());
}).when(sp3).releaseAcquiredRecords(ArgumentMatchers.eq(memberId));
- Map<SharePartitionManager.SharePartitionKey, SharePartition>
partitionCacheMap = new HashMap<>();
- partitionCacheMap.put(new
SharePartitionManager.SharePartitionKey(groupId, tp1), sp1);
- partitionCacheMap.put(new
SharePartitionManager.SharePartitionKey(groupId, tp2), sp2);
- partitionCacheMap.put(new
SharePartitionManager.SharePartitionKey(groupId, tp3), sp3);
+ Map<SharePartitionKey, SharePartition> partitionCacheMap = new
HashMap<>();
+ partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
+ partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2);
+ partitionCacheMap.put(new SharePartitionKey(groupId, tp3), sp3);
- SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData
= new SharePartitionManager.ShareFetchPartitionData(
+ ShareFetchData shareFetchData = new ShareFetchData(
new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(),
FetchRequest.ORDINARY_CONSUMER_ID, -1, DELAYED_SHARE_FETCH_MAX_WAIT_MS,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK,
Optional.empty()),
groupId,
@@ -2022,7 +2024,7 @@ public class SharePartitionManagerTest {
partitionMaxBytes.keySet().forEach(topicIdPartition ->
delayedShareFetchWatchKeys.add(new DelayedShareFetchKey(groupId,
topicIdPartition)));
DelayedShareFetch delayedShareFetch =
DelayedShareFetchTest.DelayedShareFetchBuilder.builder()
- .withShareFetchPartitionData(shareFetchPartitionData)
+ .withShareFetchData(shareFetchData)
.withReplicaManager(replicaManager)
.withPartitionCacheMap(partitionCacheMap)
.build();
@@ -2067,8 +2069,8 @@ public class SharePartitionManagerTest {
Map<TopicIdPartition, Integer> partitionMaxBytes =
Collections.singletonMap(tp0, PARTITION_MAX_BYTES);
SharePartition sp0 = mock(SharePartition.class);
- Map<SharePartitionManager.SharePartitionKey, SharePartition>
partitionCacheMap = new HashMap<>();
- partitionCacheMap.put(new
SharePartitionManager.SharePartitionKey(groupId, tp0), sp0);
+ Map<SharePartitionKey, SharePartition> partitionCacheMap = new
HashMap<>();
+ partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
// Keep the initialization future pending, so fetch request is stuck.
CompletableFuture<Void> pendingInitializationFuture = new
CompletableFuture<>();
@@ -2111,8 +2113,8 @@ public class SharePartitionManagerTest {
Map<TopicIdPartition, Integer> partitionMaxBytes =
Collections.singletonMap(tp0, PARTITION_MAX_BYTES);
SharePartition sp0 = mock(SharePartition.class);
- Map<SharePartitionManager.SharePartitionKey, SharePartition>
partitionCacheMap = new HashMap<>();
- partitionCacheMap.put(new
SharePartitionManager.SharePartitionKey(groupId, tp0), sp0);
+ Map<SharePartitionKey, SharePartition> partitionCacheMap = new
HashMap<>();
+ partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
ReplicaManager replicaManager = mock(ReplicaManager.class);
DelayedOperationPurgatory<DelayedShareFetch>
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
@@ -2181,7 +2183,7 @@ public class SharePartitionManagerTest {
assertTrue(partitionCacheMap.isEmpty());
// The last exception removes the share partition from the cache hence
re-add the share partition to cache.
- partitionCacheMap.put(new
SharePartitionManager.SharePartitionKey(groupId, tp0), sp0);
+ partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
// Return NotLeaderOrFollowerException to simulate initialization
failure.
when(sp0.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new
NotLeaderOrFollowerException("Not leader or follower")));
future = sharePartitionManager.fetchMessages(groupId,
memberId.toString(), fetchParams, partitionMaxBytes);
@@ -2195,7 +2197,7 @@ public class SharePartitionManagerTest {
assertTrue(partitionCacheMap.isEmpty());
// The last exception removes the share partition from the cache hence
re-add the share partition to cache.
- partitionCacheMap.put(new
SharePartitionManager.SharePartitionKey(groupId, tp0), sp0);
+ partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
// Return RuntimeException to simulate initialization failure.
when(sp0.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new
RuntimeException("Runtime exception")));
future = sharePartitionManager.fetchMessages(groupId,
memberId.toString(), fetchParams, partitionMaxBytes);
@@ -2289,11 +2291,11 @@ public class SharePartitionManagerTest {
private ReplicaManager replicaManager = mock(ReplicaManager.class);
private Time time = new MockTime();
private ShareSessionCache cache = new ShareSessionCache(10, 1000);
- private Map<SharePartitionManager.SharePartitionKey, SharePartition>
partitionCacheMap = new HashMap<>();
+ private Map<SharePartitionKey, SharePartition> partitionCacheMap = new
HashMap<>();
private Persister persister = NoOpShareStatePersister.getInstance();
private Timer timer = new MockTimer();
private Metrics metrics = new Metrics();
- private
ConcurrentLinkedQueue<SharePartitionManager.ShareFetchPartitionData> fetchQueue
= new ConcurrentLinkedQueue<>();
+ private ConcurrentLinkedQueue<ShareFetchData> fetchQueue = new
ConcurrentLinkedQueue<>();
private DelayedOperationPurgatory<DelayedShareFetch>
delayedShareFetchPurgatory = mock(DelayedOperationPurgatory.class);
private SharePartitionManagerBuilder withReplicaManager(ReplicaManager
replicaManager) {
@@ -2311,7 +2313,7 @@ public class SharePartitionManagerTest {
return this;
}
- private SharePartitionManagerBuilder
withPartitionCacheMap(Map<SharePartitionManager.SharePartitionKey,
SharePartition> partitionCacheMap) {
+ private SharePartitionManagerBuilder
withPartitionCacheMap(Map<SharePartitionKey, SharePartition> partitionCacheMap)
{
this.partitionCacheMap = partitionCacheMap;
return this;
}
@@ -2331,7 +2333,7 @@ public class SharePartitionManagerTest {
return this;
}
- private SharePartitionManagerBuilder
withFetchQueue(ConcurrentLinkedQueue<SharePartitionManager.ShareFetchPartitionData>
fetchQueue) {
+ private SharePartitionManagerBuilder
withFetchQueue(ConcurrentLinkedQueue<ShareFetchData> fetchQueue) {
this.fetchQueue = fetchQueue;
return this;
}
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index 4d821db3dc6..14ad6d5deb9 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -42,9 +42,9 @@ import org.apache.kafka.server.share.PartitionFactory;
import org.apache.kafka.server.share.Persister;
import org.apache.kafka.server.share.PersisterStateBatch;
import org.apache.kafka.server.share.ReadShareGroupStateResult;
-import org.apache.kafka.server.share.ShareAcknowledgementBatch;
import org.apache.kafka.server.share.TopicData;
import org.apache.kafka.server.share.WriteShareGroupStateResult;
+import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
import org.apache.kafka.server.util.FutureUtils;
import org.apache.kafka.server.util.timer.SystemTimer;
import org.apache.kafka.server.util.timer.SystemTimerReaper;
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index a658b49b567..7972e9c5355 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -88,8 +88,9 @@ import
org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_2_IV0, IBP_2_2_I
import org.apache.kafka.server.common.{FeatureVersion, FinalizedFeatures,
GroupVersion, KRaftVersion, MetadataVersion, RequestLocal}
import org.apache.kafka.server.config.{ConfigType, KRaftConfigs,
ReplicationConfigs, ServerConfigs, ServerLogConfigs, ShareGroupConfig}
import org.apache.kafka.server.metrics.ClientMetricsTestUtils
-import org.apache.kafka.server.share.{CachedSharePartition,
ErroneousAndValidPartitionData, ShareAcknowledgementBatch}
+import org.apache.kafka.server.share.{CachedSharePartition,
ErroneousAndValidPartitionData}
import org.apache.kafka.server.quota.ThrottleCallback
+import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch
import org.apache.kafka.server.share.context.{FinalContext,
ShareSessionContext}
import org.apache.kafka.server.share.session.{ShareSession, ShareSessionKey}
import org.apache.kafka.server.util.{FutureUtils, MockTime}
diff --git
a/share/src/main/java/org/apache/kafka/server/share/SharePartitionKey.java
b/share/src/main/java/org/apache/kafka/server/share/SharePartitionKey.java
index 6e56edbf3bb..8a2d7eb4f46 100644
--- a/share/src/main/java/org/apache/kafka/server/share/SharePartitionKey.java
+++ b/share/src/main/java/org/apache/kafka/server/share/SharePartitionKey.java
@@ -17,36 +17,48 @@
package org.apache.kafka.server.share;
import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import java.util.Objects;
/**
- * Common immutable share partition key class. This class is
- * placed in server-common so that it can be freely used across
- * various modules.
+ * The SharePartitionKey is used to uniquely identify a share partition. The
key is made up of the
+ * share group id, the topic id and the partition id. The key is used to store
the SharePartition
+ * objects in the partition cache map.
*/
public class SharePartitionKey {
- private final String groupId;
- private final Uuid topicId;
- private final int partition;
+
+ protected final String groupId;
+ protected final TopicIdPartition topicIdPartition;
+
+ public SharePartitionKey(String groupId, TopicIdPartition
topicIdPartition) {
+ this.groupId = Objects.requireNonNull(groupId);
+ this.topicIdPartition = Objects.requireNonNull(topicIdPartition);
+ }
private SharePartitionKey(String groupId, Uuid topicId, int partition) {
- this.groupId = groupId;
- this.topicId = topicId;
- this.partition = partition;
+ this(groupId, topicId, null, partition);
+ }
+
+ private SharePartitionKey(String groupId, Uuid topicId, String topic, int
partition) {
+ this(groupId, new TopicIdPartition(Objects.requireNonNull(topicId),
new TopicPartition(topic, partition)));
}
public String groupId() {
return groupId;
}
+ public TopicIdPartition topicIdPartition() {
+ return topicIdPartition;
+ }
+
public Uuid topicId() {
- return topicId;
+ return topicIdPartition.topicId();
}
public int partition() {
- return partition;
+ return topicIdPartition.partition();
}
public static SharePartitionKey getInstance(String groupId,
TopicIdPartition topicIdPartition) {
@@ -57,33 +69,28 @@ public class SharePartitionKey {
return new SharePartitionKey(groupId, topicId, partition);
}
- public String asCoordinatorKey() {
- return asCoordinatorKey(groupId, topicId, partition);
- }
-
- public static String asCoordinatorKey(String groupId, Uuid topicId, int
partition) {
- return String.format("%s:%s:%d", groupId, topicId, partition);
- }
-
@Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (!(o instanceof SharePartitionKey)) return false;
- SharePartitionKey that = (SharePartitionKey) o;
- return partition == that.partition && Objects.equals(groupId,
that.groupId) && Objects.equals(topicId, that.topicId);
+ public boolean equals(final Object obj) {
+ if (this == obj)
+ return true;
+ else if (obj == null || getClass() != obj.getClass())
+ return false;
+ else {
+ SharePartitionKey that = (SharePartitionKey) obj;
+ return groupId.equals(that.groupId) &&
Objects.equals(topicIdPartition, that.topicIdPartition);
+ }
}
@Override
public int hashCode() {
- return Objects.hash(groupId, topicId, partition);
+ return Objects.hash(groupId, topicIdPartition);
}
@Override
public String toString() {
return "SharePartitionKey{" +
- "groupId=" + groupId +
- ",topicId=" + topicId +
- ",partition=" + partition +
- "}";
+ "groupId='" + groupId +
+ ", topicIdPartition=" + topicIdPartition +
+ '}';
}
}
diff --git
a/share/src/main/java/org/apache/kafka/server/share/ShareAcknowledgementBatch.java
b/share/src/main/java/org/apache/kafka/server/share/acknowledge/ShareAcknowledgementBatch.java
similarity index 97%
rename from
share/src/main/java/org/apache/kafka/server/share/ShareAcknowledgementBatch.java
rename to
share/src/main/java/org/apache/kafka/server/share/acknowledge/ShareAcknowledgementBatch.java
index 5aacbee0322..b23104d6dab 100644
---
a/share/src/main/java/org/apache/kafka/server/share/ShareAcknowledgementBatch.java
+++
b/share/src/main/java/org/apache/kafka/server/share/acknowledge/ShareAcknowledgementBatch.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.kafka.server.share;
+package org.apache.kafka.server.share.acknowledge;
import java.util.List;
diff --git
a/share/src/main/java/org/apache/kafka/server/share/fetch/ShareFetchData.java
b/share/src/main/java/org/apache/kafka/server/share/fetch/ShareFetchData.java
new file mode 100644
index 00000000000..667c05df047
--- /dev/null
+++
b/share/src/main/java/org/apache/kafka/server/share/fetch/ShareFetchData.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.share.fetch;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.message.ShareFetchResponseData.PartitionData;
+import org.apache.kafka.storage.internals.log.FetchParams;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The ShareFetchData class is used to store the fetch parameters for a share
fetch request.
+ */
+public class ShareFetchData {
+
+ private final FetchParams fetchParams;
+ private final String groupId;
+ private final String memberId;
+ private final CompletableFuture<Map<TopicIdPartition, PartitionData>>
future;
+ private final Map<TopicIdPartition, Integer> partitionMaxBytes;
+
+ public ShareFetchData(
+ FetchParams fetchParams,
+ String groupId,
+ String memberId,
+ CompletableFuture<Map<TopicIdPartition, PartitionData>> future,
+ Map<TopicIdPartition, Integer> partitionMaxBytes
+ ) {
+ this.fetchParams = fetchParams;
+ this.groupId = groupId;
+ this.memberId = memberId;
+ this.future = future;
+ this.partitionMaxBytes = partitionMaxBytes;
+ }
+
+ public String groupId() {
+ return groupId;
+ }
+
+ public String memberId() {
+ return memberId;
+ }
+
+ public CompletableFuture<Map<TopicIdPartition, PartitionData>> future() {
+ return future;
+ }
+
+ public Map<TopicIdPartition, Integer> partitionMaxBytes() {
+ return partitionMaxBytes;
+ }
+
+ public FetchParams fetchParams() {
+ return fetchParams;
+ }
+}