This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 76a1c839cf1 KAFKA-19340 Move DelayedRemoteFetch to the storage module
(#19876)
76a1c839cf1 is described below
commit 76a1c839cf1f6e4ac2bc3317a73e65955428a1f6
Author: Lan Ding <[email protected]>
AuthorDate: Sat Oct 25 00:01:24 2025 +0800
KAFKA-19340 Move DelayedRemoteFetch to the storage module (#19876)
Move DelayedRemoteFetch to the storage module and rewrite it to java.
Reviewers: Mickael Maison <[email protected]>, Kamal
Chandraprakash <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
checkstyle/import-control-storage.xml | 5 +-
.../java/kafka/server/share/DelayedShareFetch.java | 5 +-
.../kafka/server/share/PendingRemoteFetches.java | 2 +-
.../src/main/scala/kafka/server/DelayedFetch.scala | 12 +-
.../scala/kafka/server/DelayedRemoteFetch.scala | 146 ------
.../main/scala/kafka/server/ReplicaManager.scala | 56 ++-
.../kafka/server/share/DelayedShareFetchTest.java | 10 +-
.../server/share/SharePartitionManagerTest.java | 4 +-
.../kafka/server/DelayedFetchTest.scala | 29 +-
.../kafka/server/DelayedRemoteFetchTest.scala | 501 ---------------------
.../AbstractCoordinatorConcurrencyTest.scala | 2 +-
.../kafka/server/ReplicaManagerQuotasTest.scala | 6 +-
.../unit/kafka/server/ReplicaManagerTest.scala | 10 +-
.../kafka/server/purgatory/DelayedRemoteFetch.java | 192 ++++++++
.../internals/log/FetchPartitionStatus.java | 29 ++
.../storage/internals/log}/LogReadResult.java | 68 +--
.../server/purgatory/DelayedRemoteFetchTest.java | 480 ++++++++++++++++++++
17 files changed, 781 insertions(+), 776 deletions(-)
diff --git a/checkstyle/import-control-storage.xml
b/checkstyle/import-control-storage.xml
index 2a0f7412685..e4194b20e2c 100644
--- a/checkstyle/import-control-storage.xml
+++ b/checkstyle/import-control-storage.xml
@@ -84,6 +84,10 @@
</subpackage>
</subpackage>
</subpackage>
+
+ <subpackage name="purgatory">
+ <allow pkg="org.apache.kafka.server.storage.log" />
+ </subpackage>
</subpackage>
<subpackage name="storage.internals">
@@ -164,7 +168,6 @@
<allow pkg="org.apache.kafka.server.log.remote.storage" />
<allow pkg="scala.jdk.javaapi" />
<allow pkg="org.apache.kafka.test" />
-
</subpackage>
</import-control>
diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java
b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
index 969029a6ea5..7b061a28bd5 100644
--- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java
+++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
@@ -30,7 +30,6 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.raft.errors.NotLeaderException;
-import org.apache.kafka.server.LogReadResult;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import org.apache.kafka.server.purgatory.DelayedOperation;
import org.apache.kafka.server.share.SharePartitionKey;
@@ -46,6 +45,7 @@ import org.apache.kafka.server.util.timer.TimerTask;
import org.apache.kafka.storage.internals.log.FetchDataInfo;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.apache.kafka.storage.internals.log.LogOffsetSnapshot;
+import org.apache.kafka.storage.internals.log.LogReadResult;
import org.apache.kafka.storage.internals.log.RemoteLogReadResult;
import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;
@@ -852,13 +852,12 @@ public class DelayedShareFetch extends DelayedOperation {
if (remoteFetch.remoteFetchResult().isDone()) {
RemoteLogReadResult remoteLogReadResult =
remoteFetch.remoteFetchResult().get();
if (remoteLogReadResult.error().isPresent()) {
- Throwable error = remoteLogReadResult.error().get();
// If there is any error for the remote fetch topic
partition, we populate the error accordingly.
shareFetchPartitionData.add(
new ShareFetchPartitionData(
remoteFetch.topicIdPartition(),
partitionsAcquired.get(remoteFetch.topicIdPartition()),
-
ReplicaManager.createLogReadResult(error).toFetchPartitionData(false)
+ new
LogReadResult(Errors.forException(remoteLogReadResult.error().get())).toFetchPartitionData(false)
)
);
} else {
diff --git a/core/src/main/java/kafka/server/share/PendingRemoteFetches.java
b/core/src/main/java/kafka/server/share/PendingRemoteFetches.java
index 575a32ef466..c25cc706775 100644
--- a/core/src/main/java/kafka/server/share/PendingRemoteFetches.java
+++ b/core/src/main/java/kafka/server/share/PendingRemoteFetches.java
@@ -17,8 +17,8 @@
package kafka.server.share;
import org.apache.kafka.common.TopicIdPartition;
-import org.apache.kafka.server.LogReadResult;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
+import org.apache.kafka.storage.internals.log.LogReadResult;
import org.apache.kafka.storage.internals.log.RemoteLogReadResult;
import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala
b/core/src/main/scala/kafka/server/DelayedFetch.scala
index 91480bb420e..ebdc0000440 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -24,25 +24,15 @@ import java.util.concurrent.TimeUnit
import org.apache.kafka.common.TopicIdPartition
import org.apache.kafka.common.errors._
import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.FetchRequest.PartitionData
import
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH,
UNDEFINED_EPOCH_OFFSET}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.purgatory.DelayedOperation
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams,
FetchPartitionData}
-import org.apache.kafka.storage.internals.log.LogOffsetMetadata
+import org.apache.kafka.storage.internals.log.{FetchPartitionStatus,
LogOffsetMetadata}
import scala.collection._
import scala.jdk.CollectionConverters._
-case class FetchPartitionStatus(startOffsetMetadata: LogOffsetMetadata,
fetchInfo: PartitionData) {
-
- override def toString: String = {
- "[startOffsetMetadata: " + startOffsetMetadata +
- ", fetchInfo: " + fetchInfo +
- "]"
- }
-}
-
/**
* A delayed fetch operation that can be created by the replica manager and
watched
* in the fetch operation purgatory
diff --git a/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
b/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
deleted file mode 100644
index fc2926988c0..00000000000
--- a/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
+++ /dev/null
@@ -1,146 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import com.yammer.metrics.core.Meter
-import kafka.utils.Logging
-import org.apache.kafka.common.TopicIdPartition
-import org.apache.kafka.common.errors._
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.server.LogReadResult
-import org.apache.kafka.server.metrics.KafkaMetricsGroup
-import org.apache.kafka.server.purgatory.DelayedOperation
-import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData}
-import org.apache.kafka.storage.internals.log.{LogOffsetMetadata,
RemoteLogReadResult, RemoteStorageFetchInfo}
-
-import java.util
-import java.util.concurrent.{CompletableFuture, Future, TimeUnit}
-import java.util.{Optional, OptionalInt, OptionalLong}
-import scala.collection._
-
-/**
- * A remote fetch operation that can be created by the replica manager and
watched
- * in the remote fetch operation purgatory
- */
-class DelayedRemoteFetch(remoteFetchTasks: util.Map[TopicIdPartition,
Future[Void]],
- remoteFetchResults: util.Map[TopicIdPartition,
CompletableFuture[RemoteLogReadResult]],
- remoteFetchInfos: util.Map[TopicIdPartition,
RemoteStorageFetchInfo],
- remoteFetchMaxWaitMs: Long,
- fetchPartitionStatus: Seq[(TopicIdPartition,
FetchPartitionStatus)],
- fetchParams: FetchParams,
- localReadResults: Seq[(TopicIdPartition,
LogReadResult)],
- replicaManager: ReplicaManager,
- responseCallback: Seq[(TopicIdPartition,
FetchPartitionData)] => Unit)
- extends DelayedOperation(remoteFetchMaxWaitMs) with Logging {
-
- if (fetchParams.isFromFollower) {
- throw new IllegalStateException(s"The follower should not invoke remote
fetch. Fetch params are: $fetchParams")
- }
-
- /**
- * The operation can be completed if:
- *
- * Case a: This broker is no longer the leader of the partition it tries to
fetch
- * Case b: This broker does not know the partition it tries to fetch
- * Case c: All the remote storage read request completed (succeeded or
failed)
- * Case d: The partition is in an offline log directory on this broker
- *
- * Upon completion, should return whatever data is available for each valid
partition
- */
- override def tryComplete(): Boolean = {
- fetchPartitionStatus.foreach {
- case (topicPartition, fetchStatus) =>
- val fetchOffset = fetchStatus.startOffsetMetadata
- try {
- if (fetchOffset != LogOffsetMetadata.UNKNOWN_OFFSET_METADATA) {
-
replicaManager.getPartitionOrException(topicPartition.topicPartition())
- }
- } catch {
- case _: KafkaStorageException => // Case d
- debug(s"Partition $topicPartition is in an offline log directory,
satisfy $fetchParams immediately")
- return forceComplete()
- case _: UnknownTopicOrPartitionException => // Case b
- debug(s"Broker no longer knows of partition $topicPartition,
satisfy $fetchParams immediately")
- return forceComplete()
- case _: NotLeaderOrFollowerException => // Case a
- debug("Broker is no longer the leader or follower of %s, satisfy
%s immediately".format(topicPartition, fetchParams))
- return forceComplete()
- }
- }
- // Case c
- if (remoteFetchResults.values().stream().allMatch(taskResult =>
taskResult.isDone))
- forceComplete()
- else
- false
- }
-
- override def onExpiration(): Unit = {
- // cancel the remote storage read task, if it has not been executed yet and
- // avoid interrupting the task if it is already running as it may force
closing opened/cached resources as transaction index.
- remoteFetchTasks.forEach { (topicIdPartition, task) =>
- if (task != null && !task.isDone) {
- if (!task.cancel(false)) {
- debug(s"Remote fetch task for remoteFetchInfo:
${remoteFetchInfos.get(topicIdPartition)} could not be cancelled.")
- }
- }
- }
-
- DelayedRemoteFetchMetrics.expiredRequestMeter.mark()
- }
-
- /**
- * Upon completion, read whatever data is available and pass to the complete
callback
- */
- override def onComplete(): Unit = {
- val fetchPartitionData = localReadResults.map { case (tp, result) =>
- val remoteFetchResult = remoteFetchResults.get(tp)
- if (remoteFetchResults.containsKey(tp)
- && remoteFetchResult.isDone
- && result.error == Errors.NONE
- && result.info.delayedRemoteStorageFetch.isPresent) {
- if (remoteFetchResult.get.error.isPresent) {
- tp ->
ReplicaManager.createLogReadResult(remoteFetchResult.get.error.get).toFetchPartitionData(false)
- } else {
- val info = remoteFetchResult.get.fetchDataInfo.get
- tp -> new FetchPartitionData(
- result.error,
- result.highWatermark,
- result.leaderLogStartOffset,
- info.records,
- Optional.empty(),
- if (result.lastStableOffset.isPresent)
OptionalLong.of(result.lastStableOffset.getAsLong) else OptionalLong.empty(),
- info.abortedTransactions,
- if (result.preferredReadReplica.isPresent)
OptionalInt.of(result.preferredReadReplica.getAsInt) else OptionalInt.empty(),
- false)
- }
- } else {
- tp -> result.toFetchPartitionData(false)
- }
- }
-
- responseCallback(fetchPartitionData)
- }
-}
-
-object DelayedRemoteFetchMetrics {
- // Changing the package or class name may cause incompatibility with
existing code and metrics configuration
- private val metricsPackage = "kafka.server"
- private val metricsClassName = "DelayedRemoteFetchMetrics"
- private val metricsGroup = new KafkaMetricsGroup(metricsPackage,
metricsClassName)
- val expiredRequestMeter: Meter = metricsGroup.newMeter("ExpiresPerSec",
"requests", TimeUnit.SECONDS)
-}
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index a4c5416cb34..1d9d076a317 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -56,16 +56,16 @@ import org.apache.kafka.server.config.ReplicationConfigs
import org.apache.kafka.server.log.remote.storage.RemoteLogManager
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.network.BrokerEndPoint
-import org.apache.kafka.server.purgatory.{DelayedDeleteRecords,
DelayedOperationPurgatory, DelayedRemoteListOffsets,
DeleteRecordsPartitionStatus, ListOffsetsPartitionStatus,
TopicPartitionOperationKey}
+import org.apache.kafka.server.purgatory.{DelayedDeleteRecords,
DelayedOperationPurgatory, DelayedRemoteFetch, DelayedRemoteListOffsets,
DeleteRecordsPartitionStatus, ListOffsetsPartitionStatus,
TopicPartitionOperationKey}
import org.apache.kafka.server.share.fetch.{DelayedShareFetchKey,
DelayedShareFetchPartitionKey}
import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData}
import org.apache.kafka.server.transaction.AddPartitionsToTxnManager
import
org.apache.kafka.server.transaction.AddPartitionsToTxnManager.TransactionSupportedOperation
import org.apache.kafka.server.util.timer.{SystemTimer, TimerTask}
import org.apache.kafka.server.util.{Scheduler, ShutdownableThread}
-import org.apache.kafka.server.{ActionQueue, DelayedActionQueue,
LogReadResult, common}
+import org.apache.kafka.server.{ActionQueue, DelayedActionQueue, common}
import org.apache.kafka.storage.internals.checkpoint.{LazyOffsetCheckpoints,
OffsetCheckpointFile, OffsetCheckpoints}
-import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo,
LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel,
LogOffsetMetadata, LogReadInfo, OffsetResultHolder, RecordValidationException,
RemoteLogReadResult, RemoteStorageFetchInfo, UnifiedLog, VerificationGuard}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo,
FetchPartitionStatus, LeaderHwChange, LogAppendInfo, LogConfig,
LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, LogReadResult,
OffsetResultHolder, RecordValidationException, RemoteLogReadResult,
RemoteStorageFetchInfo, UnifiedLog, VerificationGuard}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import java.io.File
@@ -188,19 +188,7 @@ object ReplicaManager {
-1L,
-1L,
OptionalLong.empty(),
- Optional.of(e))
- }
-
- def createLogReadResult(e: Throwable): LogReadResult = {
- new LogReadResult(new
FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
- Optional.empty(),
- UnifiedLog.UNKNOWN_OFFSET,
- UnifiedLog.UNKNOWN_OFFSET,
- UnifiedLog.UNKNOWN_OFFSET,
- UnifiedLog.UNKNOWN_OFFSET,
- -1L,
- OptionalLong.empty(),
- Optional.of(e))
+ Errors.forException(e));
}
private[server] def isListOffsetsTimestampUnsupported(timestamp: JLong,
version: Short): Boolean = {
@@ -1639,7 +1627,7 @@ class ReplicaManager(val config: KafkaConfig,
private def processRemoteFetches(remoteFetchInfos:
util.LinkedHashMap[TopicIdPartition, RemoteStorageFetchInfo],
params: FetchParams,
responseCallback: Seq[(TopicIdPartition,
FetchPartitionData)] => Unit,
- logReadResults: Seq[(TopicIdPartition,
LogReadResult)],
+ logReadResults:
util.LinkedHashMap[TopicIdPartition, LogReadResult],
fetchPartitionStatus:
Seq[(TopicIdPartition, FetchPartitionStatus)]): Unit = {
val remoteFetchTasks = new util.HashMap[TopicIdPartition, Future[Void]]
val remoteFetchResults = new util.HashMap[TopicIdPartition,
CompletableFuture[RemoteLogReadResult]]
@@ -1651,8 +1639,15 @@ class ReplicaManager(val config: KafkaConfig,
}
val remoteFetchMaxWaitMs =
config.remoteLogManagerConfig.remoteFetchMaxWaitMs().toLong
- val remoteFetch = new DelayedRemoteFetch(remoteFetchTasks,
remoteFetchResults, remoteFetchInfos, remoteFetchMaxWaitMs,
- fetchPartitionStatus, params, logReadResults, this, responseCallback)
+ val remoteFetch = new DelayedRemoteFetch(remoteFetchTasks,
+ remoteFetchResults,
+ remoteFetchInfos,
+ remoteFetchMaxWaitMs,
+ fetchPartitionStatus.toMap.asJava,
+ params,
+ logReadResults,
+ tp => getPartitionOrException(tp),
+ response =>
responseCallback(response.asScala.toSeq))
// create a list of (topic, partition) pairs to use as keys for this
delayed fetch operation
val delayedFetchKeys = remoteFetchTasks.asScala.map { case (tp, _) => new
TopicPartitionOperationKey(tp) }.toList
@@ -1681,7 +1676,7 @@ class ReplicaManager(val config: KafkaConfig,
var hasDivergingEpoch = false
var hasPreferredReadReplica = false
- val logReadResultMap = new mutable.HashMap[TopicIdPartition, LogReadResult]
+ val logReadResultMap = new util.LinkedHashMap[TopicIdPartition,
LogReadResult]
logReadResults.foreach { case (topicIdPartition, logReadResult) =>
brokerTopicStats.topicStats(topicIdPartition.topicPartition.topic).totalFetchRequestRate.mark()
@@ -1717,14 +1712,15 @@ class ReplicaManager(val config: KafkaConfig,
// construct the fetch results from the read results
val fetchPartitionStatus = new mutable.ArrayBuffer[(TopicIdPartition,
FetchPartitionStatus)]
fetchInfos.foreach { case (topicIdPartition, partitionData) =>
- logReadResultMap.get(topicIdPartition).foreach(logReadResult => {
+ val logReadResult = logReadResultMap.get(topicIdPartition)
+ if (logReadResult != null) {
val logOffsetMetadata = logReadResult.info.fetchOffsetMetadata
- fetchPartitionStatus += (topicIdPartition ->
FetchPartitionStatus(logOffsetMetadata, partitionData))
- })
+ fetchPartitionStatus += (topicIdPartition -> new
FetchPartitionStatus(logOffsetMetadata, partitionData))
+ }
}
if (!remoteFetchInfos.isEmpty) {
- processRemoteFetches(remoteFetchInfos, params, responseCallback,
logReadResults, fetchPartitionStatus.toSeq)
+ processRemoteFetches(remoteFetchInfos, params, responseCallback,
logReadResultMap, fetchPartitionStatus.toSeq)
} else {
// If there is not enough data to respond and there is no remote data,
we will let the fetch request
// wait for new data.
@@ -1812,7 +1808,7 @@ class ReplicaManager(val config: KafkaConfig,
-1L,
OptionalLong.of(offsetSnapshot.lastStableOffset.messageOffset),
if (preferredReadReplica.isDefined)
OptionalInt.of(preferredReadReplica.get) else OptionalInt.empty(),
- Optional.empty())
+ Errors.NONE)
} else {
log =
partition.localLogWithEpochOrThrow(fetchInfo.currentLeaderEpoch,
params.fetchOnlyLeader())
@@ -1836,7 +1832,7 @@ class ReplicaManager(val config: KafkaConfig,
fetchTimeMs,
OptionalLong.of(readInfo.lastStableOffset),
if (preferredReadReplica.isDefined)
OptionalInt.of(preferredReadReplica.get) else OptionalInt.empty(),
- Optional.empty()
+ Errors.NONE
)
}
} catch {
@@ -1849,7 +1845,7 @@ class ReplicaManager(val config: KafkaConfig,
_: ReplicaNotAvailableException |
_: KafkaStorageException |
_: InconsistentTopicIdException) =>
- createLogReadResult(e)
+ new LogReadResult(Errors.forException(e))
case e: OffsetOutOfRangeException =>
handleOffsetOutOfRangeError(tp, params, fetchInfo, adjustedMaxBytes,
minOneMessage, log, fetchTimeMs, e)
case e: Throwable =>
@@ -1868,7 +1864,7 @@ class ReplicaManager(val config: KafkaConfig,
UnifiedLog.UNKNOWN_OFFSET,
-1L,
OptionalLong.empty(),
- Optional.of(e)
+ Errors.forException(e)
)
}
}
@@ -1949,10 +1945,10 @@ class ReplicaManager(val config: KafkaConfig,
fetchInfo.logStartOffset,
fetchTimeMs,
OptionalLong.of(log.lastStableOffset),
- Optional.empty[Throwable]())
+ Errors.NONE)
}
} else {
- createLogReadResult(exception)
+ new LogReadResult(Errors.forException(exception))
}
}
diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
index ffa9f8b1145..8aab8eb5495 100644
--- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
+++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
@@ -32,7 +32,6 @@ import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.server.LogReadResult;
import org.apache.kafka.server.log.remote.storage.RemoteLogManager;
import org.apache.kafka.server.purgatory.DelayedOperationKey;
import org.apache.kafka.server.purgatory.DelayedOperationPurgatory;
@@ -52,6 +51,7 @@ import org.apache.kafka.server.util.timer.TimerTask;
import org.apache.kafka.storage.internals.log.FetchDataInfo;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.apache.kafka.storage.internals.log.LogOffsetSnapshot;
+import org.apache.kafka.storage.internals.log.LogReadResult;
import org.apache.kafka.storage.internals.log.RemoteLogReadResult;
import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
@@ -2023,7 +2023,7 @@ public class DelayedShareFetchTest {
-1L,
OptionalLong.empty(),
OptionalInt.empty(),
- Optional.empty()
+ Errors.NONE
));
when(pendingRemoteFetches.remoteFetches()).thenReturn(List.of(remoteFetch));
when(pendingRemoteFetches.isDone()).thenReturn(false);
@@ -2104,7 +2104,7 @@ public class DelayedShareFetchTest {
-1L,
OptionalLong.empty(),
OptionalInt.empty(),
- Optional.empty()
+ Errors.NONE
));
when(pendingRemoteFetches.remoteFetches()).thenReturn(List.of(remoteFetch));
when(pendingRemoteFetches.isDone()).thenReturn(false);
@@ -2179,7 +2179,7 @@ public class DelayedShareFetchTest {
-1L,
OptionalLong.empty(),
OptionalInt.empty(),
- Optional.empty()
+ Errors.NONE
))));
remoteReadTopicIdPartitions.forEach(topicIdPartition ->
logReadResults.add(new Tuple2<>(topicIdPartition, new LogReadResult(
REMOTE_FETCH_INFO,
@@ -2191,7 +2191,7 @@ public class DelayedShareFetchTest {
-1L,
OptionalLong.empty(),
OptionalInt.empty(),
- Optional.empty()
+ Errors.NONE
))));
return CollectionConverters.asScala(logReadResults).toSeq();
}
diff --git
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index 24a84bab64a..541719aea06 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -51,7 +51,6 @@ import org.apache.kafka.common.requests.ShareRequestMetadata;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.GroupConfigManager;
-import org.apache.kafka.server.LogReadResult;
import org.apache.kafka.server.common.ShareVersion;
import org.apache.kafka.server.purgatory.DelayedOperationKey;
import org.apache.kafka.server.purgatory.DelayedOperationPurgatory;
@@ -84,6 +83,7 @@ import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.server.util.timer.TimerTask;
import org.apache.kafka.storage.internals.log.FetchDataInfo;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
+import org.apache.kafka.storage.internals.log.LogReadResult;
import org.apache.kafka.storage.internals.log.OffsetResultHolder;
import org.apache.kafka.storage.log.metrics.BrokerTopicMetrics;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
@@ -3226,7 +3226,7 @@ public class SharePartitionManagerTest {
-1L,
OptionalLong.empty(),
OptionalInt.empty(),
- Optional.empty()
+ Errors.NONE
))));
return CollectionConverters.asScala(logReadResults).toSeq();
}
diff --git
a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
index f10beb0086f..fa3b8465d65 100644
--- a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
@@ -25,9 +25,8 @@ import
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEnd
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.requests.FetchRequest
-import org.apache.kafka.server.LogReadResult
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams,
FetchPartitionData}
-import org.apache.kafka.storage.internals.log.{FetchDataInfo,
LogOffsetMetadata, LogOffsetSnapshot}
+import org.apache.kafka.storage.internals.log.{FetchDataInfo,
FetchPartitionStatus, LogOffsetMetadata, LogOffsetSnapshot, LogReadResult}
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest
@@ -48,9 +47,9 @@ class DelayedFetchTest {
val currentLeaderEpoch = Optional.of[Integer](10)
val replicaId = 1
- val fetchStatus = FetchPartitionStatus(
- startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
- fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId(),
fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
+ val fetchStatus = new FetchPartitionStatus(
+ new LogOffsetMetadata(fetchOffset),
+ new FetchRequest.PartitionData(topicIdPartition.topicId(), fetchOffset,
logStartOffset, maxBytes, currentLeaderEpoch))
val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500)
var fetchResultOpt: Option[FetchPartitionData] = None
@@ -94,9 +93,9 @@ class DelayedFetchTest {
val currentLeaderEpoch = Optional.of[Integer](10)
val replicaId = 1
- val fetchStatus = FetchPartitionStatus(
- startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
- fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId(),
fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
+ val fetchStatus = new FetchPartitionStatus(
+ new LogOffsetMetadata(fetchOffset),
+ new FetchRequest.PartitionData(topicIdPartition.topicId(), fetchOffset,
logStartOffset, maxBytes, currentLeaderEpoch))
val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500)
var fetchResultOpt: Option[FetchPartitionData] = None
@@ -134,9 +133,9 @@ class DelayedFetchTest {
val lastFetchedEpoch = Optional.of[Integer](9)
val replicaId = 1
- val fetchStatus = FetchPartitionStatus(
- startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
- fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId,
fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch, lastFetchedEpoch))
+ val fetchStatus = new FetchPartitionStatus(
+ new LogOffsetMetadata(fetchOffset),
+ new FetchRequest.PartitionData(topicIdPartition.topicId, fetchOffset,
logStartOffset, maxBytes, currentLeaderEpoch, lastFetchedEpoch))
val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500)
var fetchResultOpt: Option[FetchPartitionData] = None
@@ -185,9 +184,9 @@ class DelayedFetchTest {
val currentLeaderEpoch = Optional.of[Integer](10)
val replicaId = 1
- val fetchStatus = FetchPartitionStatus(
- startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
- fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId,
fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
+ val fetchStatus = new FetchPartitionStatus(
+ new LogOffsetMetadata(fetchOffset),
+ new FetchRequest.PartitionData(topicIdPartition.topicId, fetchOffset,
logStartOffset, maxBytes, currentLeaderEpoch))
val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500)
var fetchResultOpt: Option[FetchPartitionData] = None
@@ -265,7 +264,7 @@ class DelayedFetchTest {
-1L,
-1L,
OptionalLong.empty(),
- if (error != Errors.NONE) Optional.of[Throwable](error.exception) else
Optional.empty[Throwable]())
+ error)
}
}
diff --git
a/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala
b/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala
deleted file mode 100644
index 23b4b32b0d7..00000000000
--- a/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala
+++ /dev/null
@@ -1,501 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.server
-
-import com.yammer.metrics.core.Meter
-import kafka.cluster.Partition
-import org.apache.kafka.common.errors.NotLeaderOrFollowerException
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.record.MemoryRecords
-import org.apache.kafka.common.requests.FetchRequest
-import org.apache.kafka.common.{TopicIdPartition, Uuid}
-import org.apache.kafka.server.LogReadResult
-import org.apache.kafka.server.metrics.KafkaYammerMetrics
-import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams,
FetchPartitionData}
-import org.apache.kafka.storage.internals.log._
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
-import org.mockito.ArgumentMatchers.anyBoolean
-import org.mockito.Mockito.{mock, never, verify, when}
-
-import java.util.{Collections, Optional, OptionalLong}
-import java.util.concurrent.{CompletableFuture, Future}
-import scala.collection._
-import scala.jdk.CollectionConverters._
-
-class DelayedRemoteFetchTest {
- private val maxBytes = 1024
- private val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
- private val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0,
"topic")
- private val topicIdPartition2 = new TopicIdPartition(Uuid.randomUuid(), 0,
"topic2")
- private val fetchOffset = 500L
- private val logStartOffset = 0L
- private val currentLeaderEpoch = Optional.of[Integer](10)
- private val remoteFetchMaxWaitMs = 500
-
- private val fetchStatus = FetchPartitionStatus(
- startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
- fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset,
logStartOffset, maxBytes, currentLeaderEpoch))
- private val fetchParams = buildFetchParams(replicaId = -1, maxWaitMs = 500)
-
- @Test
- def testFetch(): Unit = {
- var actualTopicPartition: Option[TopicIdPartition] = None
- var fetchResultOpt: Option[FetchPartitionData] = None
-
- def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit
= {
- assertEquals(1, responses.size)
- actualTopicPartition = Some(responses.head._1)
- fetchResultOpt = Some(responses.head._2)
- }
-
- val future: CompletableFuture[RemoteLogReadResult] = new
CompletableFuture[RemoteLogReadResult]()
- future.complete(buildRemoteReadResult(Errors.NONE))
- val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0,
false, topicIdPartition, null, null)
- val highWatermark = 100
- val leaderLogStartOffset = 10
- val logReadInfo = buildReadResult(Errors.NONE, highWatermark,
leaderLogStartOffset)
-
- val delayedRemoteFetch = new DelayedRemoteFetch(
- java.util.Collections.emptyMap[TopicIdPartition, Future[Void]](),
- java.util.Collections.singletonMap(topicIdPartition, future),
- java.util.Collections.singletonMap(topicIdPartition, fetchInfo),
- remoteFetchMaxWaitMs,
- Seq(topicIdPartition -> fetchStatus),
- fetchParams,
- Seq(topicIdPartition -> logReadInfo),
- replicaManager,
- callback)
-
-
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition))
- .thenReturn(mock(classOf[Partition]))
-
- assertTrue(delayedRemoteFetch.tryComplete())
- assertTrue(delayedRemoteFetch.isCompleted)
- assertTrue(actualTopicPartition.isDefined)
- assertEquals(topicIdPartition, actualTopicPartition.get)
- assertTrue(fetchResultOpt.isDefined)
-
- val fetchResult = fetchResultOpt.get
- assertEquals(Errors.NONE, fetchResult.error)
- assertEquals(highWatermark, fetchResult.highWatermark)
- assertEquals(leaderLogStartOffset, fetchResult.logStartOffset)
- }
-
- @Test
- def testFollowerFetch(): Unit = {
- var actualTopicPartition: Option[TopicIdPartition] = None
- var fetchResultOpt: Option[FetchPartitionData] = None
-
- def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit
= {
- assertEquals(1, responses.size)
- actualTopicPartition = Some(responses.head._1)
- fetchResultOpt = Some(responses.head._2)
- }
-
- val future: CompletableFuture[RemoteLogReadResult] = new
CompletableFuture[RemoteLogReadResult]()
- future.complete(buildRemoteReadResult(Errors.NONE))
- val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0,
false, topicIdPartition, null, null)
- val highWatermark = 100
- val leaderLogStartOffset = 10
- val logReadInfo = buildReadResult(Errors.NONE, highWatermark,
leaderLogStartOffset)
- val fetchParams = buildFetchParams(replicaId = 1, maxWaitMs = 500)
-
- assertThrows(classOf[IllegalStateException], () => new DelayedRemoteFetch(
- java.util.Collections.emptyMap[TopicIdPartition, Future[Void]](),
- java.util.Collections.singletonMap(topicIdPartition, future),
- java.util.Collections.singletonMap(topicIdPartition, fetchInfo),
- remoteFetchMaxWaitMs,
- Seq(topicIdPartition -> fetchStatus),
- fetchParams,
- Seq(topicIdPartition -> logReadInfo),
- replicaManager,
- callback))
- }
-
- @Test
- def testNotLeaderOrFollower(): Unit = {
- var actualTopicPartition: Option[TopicIdPartition] = None
- var fetchResultOpt: Option[FetchPartitionData] = None
-
- def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit
= {
- assertEquals(1, responses.size)
- actualTopicPartition = Some(responses.head._1)
- fetchResultOpt = Some(responses.head._2)
- }
-
- // throw exception while getPartition
-
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition))
- .thenThrow(new NotLeaderOrFollowerException(s"Replica for
$topicIdPartition not available"))
-
- val future: CompletableFuture[RemoteLogReadResult] = new
CompletableFuture[RemoteLogReadResult]()
- val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0,
false, topicIdPartition, null, null)
-
- val logReadInfo = buildReadResult(Errors.NONE)
-
- val delayedRemoteFetch = new DelayedRemoteFetch(
- java.util.Collections.emptyMap[TopicIdPartition, Future[Void]](),
- java.util.Collections.singletonMap(topicIdPartition, future),
- java.util.Collections.singletonMap(topicIdPartition, fetchInfo),
- remoteFetchMaxWaitMs,
- Seq(topicIdPartition -> fetchStatus),
- fetchParams,
- Seq(topicIdPartition -> logReadInfo),
- replicaManager,
- callback)
-
- // delayed remote fetch should still be able to complete
- assertTrue(delayedRemoteFetch.tryComplete())
- assertTrue(delayedRemoteFetch.isCompleted)
- assertEquals(topicIdPartition, actualTopicPartition.get)
- assertTrue(fetchResultOpt.isDefined)
- }
-
- @Test
- def testErrorLogReadInfo(): Unit = {
- var actualTopicPartition: Option[TopicIdPartition] = None
- var fetchResultOpt: Option[FetchPartitionData] = None
-
- def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit
= {
- assertEquals(1, responses.size)
- actualTopicPartition = Some(responses.head._1)
- fetchResultOpt = Some(responses.head._2)
- }
-
-
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition))
- .thenReturn(mock(classOf[Partition]))
-
- val future: CompletableFuture[RemoteLogReadResult] = new
CompletableFuture[RemoteLogReadResult]()
- future.complete(buildRemoteReadResult(Errors.NONE))
- val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0,
false, topicIdPartition, null, null)
-
- // build a read result with error
- val logReadInfo = buildReadResult(Errors.FENCED_LEADER_EPOCH)
-
- val delayedRemoteFetch = new DelayedRemoteFetch(
- java.util.Collections.emptyMap[TopicIdPartition, Future[Void]](),
- java.util.Collections.singletonMap(topicIdPartition, future),
- java.util.Collections.singletonMap(topicIdPartition, fetchInfo),
- remoteFetchMaxWaitMs,
- Seq(topicIdPartition -> fetchStatus),
- fetchParams,
- Seq(topicIdPartition -> logReadInfo),
- replicaManager,
- callback)
-
- assertTrue(delayedRemoteFetch.tryComplete())
- assertTrue(delayedRemoteFetch.isCompleted)
- assertEquals(topicIdPartition, actualTopicPartition.get)
- assertTrue(fetchResultOpt.isDefined)
- assertEquals(Errors.FENCED_LEADER_EPOCH, fetchResultOpt.get.error)
- }
-
- @Test
- def testRequestExpiry(): Unit = {
- val responses = mutable.Map[TopicIdPartition, FetchPartitionData]()
-
- def callback(responseSeq: Seq[(TopicIdPartition, FetchPartitionData)]):
Unit = {
- responseSeq.foreach { case (tp, data) =>
- responses.put(tp, data)
- }
- }
-
- def expiresPerSecValue(): Double = {
- val allMetrics = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
- val metric = allMetrics.find { case (n, _) =>
n.getMBeanName.endsWith("kafka.server:type=DelayedRemoteFetchMetrics,name=ExpiresPerSec")
}
-
- if (metric.isEmpty)
- 0
- else
- metric.get._2.asInstanceOf[Meter].count
- }
-
- val remoteFetchTaskExpired = mock(classOf[Future[Void]])
- val remoteFetchTask2 = mock(classOf[Future[Void]])
- // complete the 2nd task, and keep the 1st one expired
- when(remoteFetchTask2.isDone).thenReturn(true)
-
- // Create futures - one completed, one not
- val future1: CompletableFuture[RemoteLogReadResult] = new
CompletableFuture[RemoteLogReadResult]()
- val future2: CompletableFuture[RemoteLogReadResult] = new
CompletableFuture[RemoteLogReadResult]()
- // Only complete one remote fetch
- future2.complete(buildRemoteReadResult(Errors.NONE))
-
- val fetchInfo1 = new RemoteStorageFetchInfo(0, false, topicIdPartition,
null, null)
- val fetchInfo2 = new RemoteStorageFetchInfo(0, false, topicIdPartition2,
null, null)
-
- val highWatermark = 100
- val leaderLogStartOffset = 10
-
- val logReadInfo1 = buildReadResult(Errors.NONE, highWatermark,
leaderLogStartOffset)
- val logReadInfo2 = buildReadResult(Errors.NONE)
-
- val fetchStatus1 = FetchPartitionStatus(
- startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
- fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset,
logStartOffset, maxBytes, currentLeaderEpoch))
- val fetchStatus2 = FetchPartitionStatus(
- startOffsetMetadata = new LogOffsetMetadata(fetchOffset + 100),
- fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset +
100, logStartOffset, maxBytes, currentLeaderEpoch))
-
- // Set up maps for multiple partitions
- val remoteFetchTasks = new java.util.HashMap[TopicIdPartition,
Future[Void]]()
- val remoteFetchResults = new java.util.HashMap[TopicIdPartition,
CompletableFuture[RemoteLogReadResult]]()
- val remoteFetchInfos = new java.util.HashMap[TopicIdPartition,
RemoteStorageFetchInfo]()
-
- remoteFetchTasks.put(topicIdPartition, remoteFetchTaskExpired)
- remoteFetchTasks.put(topicIdPartition2, remoteFetchTask2)
- remoteFetchResults.put(topicIdPartition, future1)
- remoteFetchResults.put(topicIdPartition2, future2)
- remoteFetchInfos.put(topicIdPartition, fetchInfo1)
- remoteFetchInfos.put(topicIdPartition2, fetchInfo2)
-
- val delayedRemoteFetch = new DelayedRemoteFetch(
- remoteFetchTasks,
- remoteFetchResults,
- remoteFetchInfos,
- remoteFetchMaxWaitMs,
- Seq(topicIdPartition -> fetchStatus1, topicIdPartition2 -> fetchStatus2),
- fetchParams,
- Seq(topicIdPartition -> logReadInfo1, topicIdPartition2 -> logReadInfo2),
- replicaManager,
- callback)
-
-
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition))
- .thenReturn(mock(classOf[Partition]))
-
when(replicaManager.getPartitionOrException(topicIdPartition2.topicPartition))
- .thenReturn(mock(classOf[Partition]))
-
- // Verify that the ExpiresPerSec metric is zero before fetching
- val existingMetricVal = expiresPerSecValue()
- // Verify the delayedRemoteFetch is not completed yet
- assertFalse(delayedRemoteFetch.isCompleted)
-
- // Force the delayed remote fetch to expire
- delayedRemoteFetch.run()
-
- // Check that the expired task was cancelled and force-completed
- verify(remoteFetchTaskExpired).cancel(anyBoolean())
- verify(remoteFetchTask2, never()).cancel(anyBoolean())
- assertTrue(delayedRemoteFetch.isCompleted)
-
- // Check that the ExpiresPerSec metric was incremented
- assertTrue(expiresPerSecValue() > existingMetricVal)
-
- // Fetch results should include 2 results and the expired one should
return local read results
- assertEquals(2, responses.size)
- assertTrue(responses.contains(topicIdPartition))
- assertTrue(responses.contains(topicIdPartition2))
-
- assertEquals(Errors.NONE, responses(topicIdPartition).error)
- assertEquals(highWatermark, responses(topicIdPartition).highWatermark)
- assertEquals(leaderLogStartOffset,
responses(topicIdPartition).logStartOffset)
-
- assertEquals(Errors.NONE, responses(topicIdPartition2).error)
- }
-
- @Test
- def testMultiplePartitions(): Unit = {
- val responses = mutable.Map[TopicIdPartition, FetchPartitionData]()
-
- def callback(responseSeq: Seq[(TopicIdPartition, FetchPartitionData)]):
Unit = {
- responseSeq.foreach { case (tp, data) =>
- responses.put(tp, data)
- }
- }
-
- // Create futures - one completed, one not
- val future1: CompletableFuture[RemoteLogReadResult] = new
CompletableFuture[RemoteLogReadResult]()
- val future2: CompletableFuture[RemoteLogReadResult] = new
CompletableFuture[RemoteLogReadResult]()
- // Only complete one remote fetch
- future1.complete(buildRemoteReadResult(Errors.NONE))
-
- val fetchInfo1 = new RemoteStorageFetchInfo(0, false, topicIdPartition,
null, null)
- val fetchInfo2 = new RemoteStorageFetchInfo(0, false, topicIdPartition2,
null, null)
-
- val highWatermark1 = 100
- val leaderLogStartOffset1 = 10
- val highWatermark2 = 200
- val leaderLogStartOffset2 = 20
-
- val logReadInfo1 = buildReadResult(Errors.NONE, 100, 10)
- val logReadInfo2 = buildReadResult(Errors.NONE, 200, 20)
-
- val fetchStatus1 = FetchPartitionStatus(
- startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
- fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset,
logStartOffset, maxBytes, currentLeaderEpoch))
- val fetchStatus2 = FetchPartitionStatus(
- startOffsetMetadata = new LogOffsetMetadata(fetchOffset + 100),
- fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset +
100, logStartOffset, maxBytes, currentLeaderEpoch))
-
- // Set up maps for multiple partitions
- val remoteFetchResults = new java.util.HashMap[TopicIdPartition,
CompletableFuture[RemoteLogReadResult]]()
- val remoteFetchInfos = new java.util.HashMap[TopicIdPartition,
RemoteStorageFetchInfo]()
-
- remoteFetchResults.put(topicIdPartition, future1)
- remoteFetchResults.put(topicIdPartition2, future2)
- remoteFetchInfos.put(topicIdPartition, fetchInfo1)
- remoteFetchInfos.put(topicIdPartition2, fetchInfo2)
-
- val delayedRemoteFetch = new DelayedRemoteFetch(
- Collections.emptyMap[TopicIdPartition, Future[Void]](),
- remoteFetchResults,
- remoteFetchInfos,
- remoteFetchMaxWaitMs,
- Seq(topicIdPartition -> fetchStatus1, topicIdPartition2 -> fetchStatus2),
- fetchParams,
- Seq(topicIdPartition -> logReadInfo1, topicIdPartition2 -> logReadInfo2),
- replicaManager,
- callback)
-
-
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition))
- .thenReturn(mock(classOf[Partition]))
-
when(replicaManager.getPartitionOrException(topicIdPartition2.topicPartition))
- .thenReturn(mock(classOf[Partition]))
-
- // Should not complete since future2 is not done
- assertFalse(delayedRemoteFetch.tryComplete())
- assertFalse(delayedRemoteFetch.isCompleted)
-
- // Complete future2
- future2.complete(buildRemoteReadResult(Errors.NONE))
-
- // Now it should complete
- assertTrue(delayedRemoteFetch.tryComplete())
- assertTrue(delayedRemoteFetch.isCompleted)
-
- // Verify both partitions were processed without error
- assertEquals(2, responses.size)
- assertTrue(responses.contains(topicIdPartition))
- assertTrue(responses.contains(topicIdPartition2))
-
- assertEquals(Errors.NONE, responses(topicIdPartition).error)
- assertEquals(highWatermark1, responses(topicIdPartition).highWatermark)
- assertEquals(leaderLogStartOffset1,
responses(topicIdPartition).logStartOffset)
-
- assertEquals(Errors.NONE, responses(topicIdPartition2).error)
- assertEquals(highWatermark2, responses(topicIdPartition2).highWatermark)
- assertEquals(leaderLogStartOffset2,
responses(topicIdPartition2).logStartOffset)
- }
-
- @Test
- def testMultiplePartitionsWithFailedResults(): Unit = {
- val responses = mutable.Map[TopicIdPartition, FetchPartitionData]()
-
- def callback(responseSeq: Seq[(TopicIdPartition, FetchPartitionData)]):
Unit = {
- responseSeq.foreach { case (tp, data) =>
- responses.put(tp, data)
- }
- }
-
- // Create futures - one successful, one with error
- val future1: CompletableFuture[RemoteLogReadResult] = new
CompletableFuture[RemoteLogReadResult]()
- val future2: CompletableFuture[RemoteLogReadResult] = new
CompletableFuture[RemoteLogReadResult]()
-
- // Created 1 successful result and 1 failed result
- future1.complete(buildRemoteReadResult(Errors.NONE))
- future2.complete(buildRemoteReadResult(Errors.UNKNOWN_SERVER_ERROR))
-
- val fetchInfo1 = new RemoteStorageFetchInfo(0, false, topicIdPartition,
null, null)
- val fetchInfo2 = new RemoteStorageFetchInfo(0, false, topicIdPartition2,
null, null)
-
- val logReadInfo1 = buildReadResult(Errors.NONE, 100, 10)
- val logReadInfo2 = buildReadResult(Errors.NONE, 200, 20)
-
- val fetchStatus1 = FetchPartitionStatus(
- startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
- fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset,
logStartOffset, maxBytes, currentLeaderEpoch))
- val fetchStatus2 = FetchPartitionStatus(
- startOffsetMetadata = new LogOffsetMetadata(fetchOffset + 100),
- fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset +
100, logStartOffset, maxBytes, currentLeaderEpoch))
-
- // Set up maps for multiple partitions
- val remoteFetchResults = new java.util.HashMap[TopicIdPartition,
CompletableFuture[RemoteLogReadResult]]()
- val remoteFetchInfos = new java.util.HashMap[TopicIdPartition,
RemoteStorageFetchInfo]()
-
- remoteFetchResults.put(topicIdPartition, future1)
- remoteFetchResults.put(topicIdPartition2, future2)
- remoteFetchInfos.put(topicIdPartition, fetchInfo1)
- remoteFetchInfos.put(topicIdPartition2, fetchInfo2)
-
- val delayedRemoteFetch = new DelayedRemoteFetch(
- Collections.emptyMap[TopicIdPartition, Future[Void]](),
- remoteFetchResults,
- remoteFetchInfos,
- remoteFetchMaxWaitMs,
- Seq(topicIdPartition -> fetchStatus1, topicIdPartition2 -> fetchStatus2),
- fetchParams,
- Seq(topicIdPartition -> logReadInfo1, topicIdPartition2 -> logReadInfo2),
- replicaManager,
- callback)
-
-
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition))
- .thenReturn(mock(classOf[Partition]))
-
when(replicaManager.getPartitionOrException(topicIdPartition2.topicPartition))
- .thenReturn(mock(classOf[Partition]))
-
- assertTrue(delayedRemoteFetch.tryComplete())
- assertTrue(delayedRemoteFetch.isCompleted)
-
- // Verify both partitions were processed
- assertEquals(2, responses.size)
- assertTrue(responses.contains(topicIdPartition))
- assertTrue(responses.contains(topicIdPartition2))
-
- // First partition should be successful
- val fetchResult1 = responses(topicIdPartition)
- assertEquals(Errors.NONE, fetchResult1.error)
-
- // Second partition should have an error due to remote fetch failure
- val fetchResult2 = responses(topicIdPartition2)
- assertEquals(Errors.UNKNOWN_SERVER_ERROR, fetchResult2.error)
- }
-
- private def buildFetchParams(replicaId: Int,
- maxWaitMs: Int): FetchParams = {
- new FetchParams(
- replicaId,
- 1,
- maxWaitMs,
- 1,
- maxBytes,
- FetchIsolation.LOG_END,
- Optional.empty()
- )
- }
-
- private def buildReadResult(error: Errors,
- highWatermark: Int = 0,
- leaderLogStartOffset: Int = 0): LogReadResult = {
- new LogReadResult(
- new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA,
MemoryRecords.EMPTY, false, Optional.empty(),
- Optional.of(mock(classOf[RemoteStorageFetchInfo]))),
- Optional.empty(),
- highWatermark,
- leaderLogStartOffset,
- -1L,
- -1L,
- -1L,
- OptionalLong.empty(),
- if (error != Errors.NONE) Optional.of[Throwable](error.exception) else
Optional.empty[Throwable]())
- }
-
- private def buildRemoteReadResult(error: Errors): RemoteLogReadResult = {
- new RemoteLogReadResult(
- Optional.of(new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA,
MemoryRecords.EMPTY)),
- if (error != Errors.NONE) Optional.of[Throwable](error.exception) else
Optional.empty[Throwable]())
- }
-}
diff --git
a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
index 8f10811091d..366645344ae 100644
---
a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
@@ -33,7 +33,7 @@ import
org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.metadata.MetadataCache
import org.apache.kafka.server.common.RequestLocal
-import org.apache.kafka.server.purgatory.{DelayedDeleteRecords,
DelayedOperationPurgatory, DelayedRemoteListOffsets, TopicPartitionOperationKey}
+import org.apache.kafka.server.purgatory.{DelayedDeleteRecords,
DelayedOperationPurgatory, DelayedRemoteFetch, DelayedRemoteListOffsets,
TopicPartitionOperationKey}
import
org.apache.kafka.server.transaction.AddPartitionsToTxnManager.TransactionSupportedOperation
import org.apache.kafka.server.util.timer.{MockTimer, Timer}
import org.apache.kafka.server.util.{MockScheduler, MockTime, Scheduler}
diff --git
a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index a7948ae901f..307afad4f5f 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -33,7 +33,7 @@ import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.server.common.KRaftVersion
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams}
import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
-import org.apache.kafka.storage.internals.log.{FetchDataInfo, LogConfig,
LogDirFailureChannel, LogOffsetMetadata, LogOffsetSnapshot, UnifiedLog}
+import org.apache.kafka.storage.internals.log.{FetchDataInfo,
FetchPartitionStatus, LogConfig, LogDirFailureChannel, LogOffsetMetadata,
LogOffsetSnapshot, UnifiedLog}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test}
import org.mockito.ArgumentMatchers.{any, anyBoolean, anyInt, anyLong}
@@ -171,7 +171,7 @@ class ReplicaManagerQuotasTest {
when(partition.getReplica(1)).thenReturn(None)
val tp = new TopicIdPartition(Uuid.randomUuid(), new
TopicPartition("t1", 0))
- val fetchPartitionStatus = FetchPartitionStatus(
+ val fetchPartitionStatus = new FetchPartitionStatus(
new LogOffsetMetadata(50L, 0L, 250),
new PartitionData(Uuid.ZERO_UUID, 50, 0, 1, Optional.empty()))
val fetchParams = new FetchParams(
@@ -222,7 +222,7 @@ class ReplicaManagerQuotasTest {
.thenReturn(partition)
val tidp = new TopicIdPartition(Uuid.randomUuid(), new
TopicPartition("t1", 0))
- val fetchPartitionStatus = FetchPartitionStatus(
+ val fetchPartitionStatus = new FetchPartitionStatus(
new LogOffsetMetadata(50L, 0L, 250),
new PartitionData(Uuid.ZERO_UUID, 50, 0, 1, Optional.empty()))
val fetchParams = new FetchParams(
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index fa43b8bc1a4..d0ebe4b2025 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -63,8 +63,8 @@ import org.apache.kafka.server.log.remote.TopicPartitionLog
import org.apache.kafka.server.log.remote.storage._
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
import org.apache.kafka.server.network.BrokerEndPoint
-import org.apache.kafka.server.{LogReadResult, PartitionFetchState}
-import org.apache.kafka.server.purgatory.{DelayedDeleteRecords,
DelayedOperationPurgatory, DelayedRemoteListOffsets}
+import org.apache.kafka.server.PartitionFetchState
+import org.apache.kafka.server.purgatory.{DelayedDeleteRecords,
DelayedOperationPurgatory, DelayedRemoteFetch, DelayedRemoteListOffsets}
import org.apache.kafka.server.share.SharePartitionKey
import org.apache.kafka.server.share.fetch.{DelayedShareFetchGroupKey,
DelayedShareFetchKey, ShareFetch}
import org.apache.kafka.server.share.metrics.ShareGroupMetrics
@@ -76,7 +76,7 @@ import org.apache.kafka.server.util.timer.{MockTimer,
SystemTimer}
import org.apache.kafka.server.util.{MockScheduler, MockTime, Scheduler}
import org.apache.kafka.storage.internals.checkpoint.LazyOffsetCheckpoints
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
-import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig,
FetchDataInfo, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel,
LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener,
LogSegments, ProducerStateManager, ProducerStateManagerConfig,
RemoteLogReadResult, RemoteStorageFetchInfo, UnifiedLog, VerificationGuard}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig,
FetchDataInfo, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel,
LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener,
LogReadResult, LogSegments, ProducerStateManager, ProducerStateManagerConfig,
RemoteLogReadResult, RemoteStorageFetchInfo, UnifiedLog, VerificationGuard}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeEach, Test}
@@ -3575,13 +3575,13 @@ class ReplicaManagerTest {
mock(classOf[FetchDataInfo])
}).when(spyRLM).read(any())
- val curExpiresPerSec =
DelayedRemoteFetchMetrics.expiredRequestMeter.count()
+ val curExpiresPerSec = DelayedRemoteFetch.expiredRequestCount()
replicaManager.fetchMessages(params, Seq(tidp0 -> new
PartitionData(topicId, fetchOffset, 0, 100000,
Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch))),
UNBOUNDED_QUOTA, fetchCallback)
// advancing the clock to expire the delayed remote fetch
timer.advanceClock(2000L)
// verify the DelayedRemoteFetchMetrics.expiredRequestMeter.mark is
called since the delayed remote fetch is expired
- TestUtils.waitUntilTrue(() => (curExpiresPerSec + 1) ==
DelayedRemoteFetchMetrics.expiredRequestMeter.count(),
"DelayedRemoteFetchMetrics.expiredRequestMeter.count() should be 1, but got: "
+ DelayedRemoteFetchMetrics.expiredRequestMeter.count(), 10000L)
+ TestUtils.waitUntilTrue(() => (curExpiresPerSec + 1) ==
DelayedRemoteFetch.expiredRequestCount(),
"DelayedRemoteFetchMetrics.expiredRequestMeter.count() should be 1, but got: "
+ DelayedRemoteFetch.expiredRequestCount(), 10000L)
latch.countDown()
} finally {
Utils.tryAll(util.Arrays.asList[Callable[Void]](
diff --git
a/storage/src/main/java/org/apache/kafka/server/purgatory/DelayedRemoteFetch.java
b/storage/src/main/java/org/apache/kafka/server/purgatory/DelayedRemoteFetch.java
new file mode 100644
index 00000000000..8c872811eb6
--- /dev/null
+++
b/storage/src/main/java/org/apache/kafka/server/purgatory/DelayedRemoteFetch.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.purgatory;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.server.metrics.KafkaMetricsGroup;
+import org.apache.kafka.server.storage.log.FetchParams;
+import org.apache.kafka.server.storage.log.FetchPartitionData;
+import org.apache.kafka.storage.internals.log.FetchDataInfo;
+import org.apache.kafka.storage.internals.log.FetchPartitionStatus;
+import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
+import org.apache.kafka.storage.internals.log.LogReadResult;
+import org.apache.kafka.storage.internals.log.RemoteLogReadResult;
+import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;
+
+import com.yammer.metrics.core.Meter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+/**
+ * A remote fetch operation that can be created by the replica manager and
watched
+ * in the remote fetch operation purgatory
+ */
+public class DelayedRemoteFetch extends DelayedOperation {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DelayedRemoteFetch.class);
+
+ // For compatibility, metrics are defined to be under
`kafka.server.DelayedRemoteFetchMetrics` class
+ private static final KafkaMetricsGroup METRICS_GROUP = new
KafkaMetricsGroup("kafka.server", "DelayedRemoteFetchMetrics");
+
+ private static final Meter EXPIRED_REQUEST_METER =
METRICS_GROUP.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS);
+
+ private final Map<TopicIdPartition, Future<Void>> remoteFetchTasks;
+ private final Map<TopicIdPartition,
CompletableFuture<RemoteLogReadResult>> remoteFetchResults;
+ private final Map<TopicIdPartition, RemoteStorageFetchInfo>
remoteFetchInfos;
+ private final Map<TopicIdPartition, FetchPartitionStatus>
fetchPartitionStatus;
+ private final FetchParams fetchParams;
+ private final Map<TopicIdPartition, LogReadResult> localReadResults;
+ private final Consumer<TopicPartition> partitionOrException;
+ private final Consumer<Map<TopicIdPartition, FetchPartitionData>>
responseCallback;
+
+ public DelayedRemoteFetch(Map<TopicIdPartition, Future<Void>>
remoteFetchTasks,
+ Map<TopicIdPartition,
CompletableFuture<RemoteLogReadResult>> remoteFetchResults,
+ Map<TopicIdPartition, RemoteStorageFetchInfo>
remoteFetchInfos,
+ long remoteFetchMaxWaitMs,
+ Map<TopicIdPartition, FetchPartitionStatus>
fetchPartitionStatus,
+ FetchParams fetchParams,
+ Map<TopicIdPartition, LogReadResult>
localReadResults,
+ Consumer<TopicPartition> partitionOrException,
+ Consumer<Map<TopicIdPartition,
FetchPartitionData>> responseCallback) {
+ super(remoteFetchMaxWaitMs);
+ this.remoteFetchTasks = remoteFetchTasks;
+ this.remoteFetchResults = remoteFetchResults;
+ this.remoteFetchInfos = remoteFetchInfos;
+ this.fetchPartitionStatus = fetchPartitionStatus;
+ this.fetchParams = fetchParams;
+ this.localReadResults = localReadResults;
+ this.partitionOrException = partitionOrException;
+ this.responseCallback = responseCallback;
+
+ if (fetchParams.isFromFollower()) {
+ throw new IllegalStateException("The follower should not invoke
remote fetch. Fetch params are: " + fetchParams);
+ }
+ }
+
+ /**
+ * The operation can be completed if:
+ * <p>
+ * Case a: This broker is no longer the leader of the partition it tries
to fetch
+ * <p>
+ * Case b: This broker does not know the partition it tries to fetch
+ * <p>
+ * Case c: All the remote storage read requests completed (succeeded or
failed)
+ * <p>
+ * Case d: The partition is in an offline log directory on this broker
+ *
+ * Upon completion, should return whatever data is available for each
valid partition
+ */
+ @Override
+ public boolean tryComplete() {
+ for (Map.Entry<TopicIdPartition, FetchPartitionStatus> entry :
fetchPartitionStatus.entrySet()) {
+ TopicIdPartition topicPartition = entry.getKey();
+ FetchPartitionStatus fetchStatus = entry.getValue();
+ LogOffsetMetadata fetchOffset = fetchStatus.startOffsetMetadata();
+ try {
+ if
(!fetchOffset.equals(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA)) {
+
partitionOrException.accept(topicPartition.topicPartition());
+ }
+ } catch (KafkaStorageException e) { // Case d
+ LOG.debug("Partition {} is in an offline log directory,
satisfy {} immediately.", topicPartition, fetchParams);
+ return forceComplete();
+ } catch (UnknownTopicOrPartitionException e) { // Case b
+ LOG.debug("Broker no longer knows of partition {}, satisfy {}
immediately", topicPartition, fetchParams);
+ return forceComplete();
+ } catch (NotLeaderOrFollowerException e) { // Case a
+ LOG.debug("Broker is no longer the leader or follower of {},
satisfy {} immediately", topicPartition, fetchParams);
+ return forceComplete();
+ }
+ }
+
+ // Case c
+ if
(remoteFetchResults.values().stream().allMatch(CompletableFuture::isDone)) {
+ return forceComplete();
+ }
+ return false;
+ }
+
+ @Override
+ public void onExpiration() {
+ // cancel the remote storage read task, if it has not been executed
yet and
+ // avoid interrupting the task if it is already running as it may
force closing opened/cached resources as transaction index.
+ remoteFetchTasks.forEach((topicIdPartition, task) -> {
+ if (task != null && !task.isDone() && !task.cancel(false)) {
+ LOG.debug("Remote fetch task for remoteFetchInfo: {} could not
be cancelled.", remoteFetchInfos.get(topicIdPartition));
+ }
+ });
+
+ EXPIRED_REQUEST_METER.mark();
+ }
+
+ /**
+ * Upon completion, read whatever data is available and pass to the
complete callback
+ */
+ @Override
+ public void onComplete() {
+ Map<TopicIdPartition, FetchPartitionData> fetchPartitionData = new
LinkedHashMap<>();
+ localReadResults.forEach((tpId, result) -> {
+ CompletableFuture<RemoteLogReadResult> remoteFetchResult =
remoteFetchResults.get(tpId);
+ if (remoteFetchResults.containsKey(tpId)
+ && remoteFetchResult.isDone()
+ && result.error() == Errors.NONE
+ && result.info().delayedRemoteStorageFetch.isPresent()) {
+
+ if (remoteFetchResult.join().error().isPresent()) {
+ fetchPartitionData.put(tpId,
+ new
LogReadResult(Errors.forException(remoteFetchResult.join().error().get())).toFetchPartitionData(false));
+ } else {
+ FetchDataInfo info =
remoteFetchResult.join().fetchDataInfo().get();
+ fetchPartitionData.put(tpId,
+ new FetchPartitionData(
+ result.error(),
+ result.highWatermark(),
+ result.leaderLogStartOffset(),
+ info.records,
+ Optional.empty(),
+ result.lastStableOffset(),
+ info.abortedTransactions,
+ result.preferredReadReplica(),
+ false));
+ }
+ } else {
+ fetchPartitionData.put(tpId,
result.toFetchPartitionData(false));
+ }
+ });
+
+ responseCallback.accept(fetchPartitionData);
+ }
+
+ // Visible for testing
+ public static long expiredRequestCount() {
+ return EXPIRED_REQUEST_METER.count();
+ }
+}
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/FetchPartitionStatus.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/FetchPartitionStatus.java
new file mode 100644
index 00000000000..5a060013b5e
--- /dev/null
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/FetchPartitionStatus.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+
+/**
+ * A class containing log offset metadata and fetch info for a topic partition.
+ */
+public record FetchPartitionStatus(
+ LogOffsetMetadata startOffsetMetadata,
+ PartitionData fetchInfo
+) {
+}
diff --git a/server/src/main/java/org/apache/kafka/server/LogReadResult.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogReadResult.java
similarity index 59%
rename from server/src/main/java/org/apache/kafka/server/LogReadResult.java
rename to
storage/src/main/java/org/apache/kafka/storage/internals/log/LogReadResult.java
index 203654391af..057a66a25c1 100644
--- a/server/src/main/java/org/apache/kafka/server/LogReadResult.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogReadResult.java
@@ -14,12 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.server;
+package org.apache.kafka.storage.internals.log;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.server.storage.log.FetchPartitionData;
-import org.apache.kafka.storage.internals.log.FetchDataInfo;
import java.util.Optional;
import java.util.OptionalInt;
@@ -38,7 +38,7 @@ import java.util.OptionalLong;
* @param fetchTimeMs The time the fetch was received
* @param lastStableOffset Current LSO or None if the result has an exception
* @param preferredReadReplica the preferred read replica to be used for
future fetches
- * @param exception Exception if error encountered while reading from the log
+ * @param error Errors if error encountered while reading from the log
*/
public record LogReadResult(
FetchDataInfo info,
@@ -50,21 +50,8 @@ public record LogReadResult(
long fetchTimeMs,
OptionalLong lastStableOffset,
OptionalInt preferredReadReplica,
- Optional<Throwable> exception
+ Errors error
) {
- public LogReadResult(
- FetchDataInfo info,
- Optional<FetchResponseData.EpochEndOffset> divergingEpoch,
- long highWatermark,
- long leaderLogStartOffset,
- long leaderLogEndOffset,
- long followerLogStartOffset,
- long fetchTimeMs,
- OptionalLong lastStableOffset) {
- this(info, divergingEpoch, highWatermark, leaderLogStartOffset,
leaderLogEndOffset, followerLogStartOffset,
- fetchTimeMs, lastStableOffset, OptionalInt.empty(),
Optional.empty());
- }
-
public LogReadResult(
FetchDataInfo info,
Optional<FetchResponseData.EpochEndOffset> divergingEpoch,
@@ -74,44 +61,21 @@ public record LogReadResult(
long followerLogStartOffset,
long fetchTimeMs,
OptionalLong lastStableOffset,
- Optional<Throwable> exception) {
- this(info, divergingEpoch, highWatermark, leaderLogStartOffset,
leaderLogEndOffset, followerLogStartOffset,
- fetchTimeMs, lastStableOffset, OptionalInt.empty(), exception);
- }
-
- public LogReadResult(
- FetchDataInfo info,
- Optional<FetchResponseData.EpochEndOffset> divergingEpoch,
- long highWatermark,
- long leaderLogStartOffset,
- long leaderLogEndOffset,
- long followerLogStartOffset,
- long fetchTimeMs,
- OptionalLong lastStableOffset,
- OptionalInt preferredReadReplica) {
+ Errors error) {
this(info, divergingEpoch, highWatermark, leaderLogStartOffset,
leaderLogEndOffset, followerLogStartOffset,
- fetchTimeMs, lastStableOffset, preferredReadReplica,
Optional.empty());
- }
-
- public Errors error() {
- if (exception.isPresent()) {
- return Errors.forException(exception.get());
- }
- return Errors.NONE;
+ fetchTimeMs, lastStableOffset, OptionalInt.empty(), error);
}
- @Override
- public String toString() {
- return "LogReadResult(info=" + info +
- ", divergingEpoch=" + divergingEpoch +
- ", highWatermark=" + highWatermark +
- ", leaderLogStartOffset" + leaderLogStartOffset +
- ", leaderLogEndOffset" + leaderLogEndOffset +
- ", followerLogStartOffset" + followerLogStartOffset +
- ", fetchTimeMs=" + fetchTimeMs +
- ", preferredReadReplica=" + preferredReadReplica +
- ", lastStableOffset=" + lastStableOffset +
- ", error=" + error() + ")";
+ public LogReadResult(Errors error) {
+ this(new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA,
MemoryRecords.EMPTY),
+ Optional.empty(),
+ UnifiedLog.UNKNOWN_OFFSET,
+ UnifiedLog.UNKNOWN_OFFSET,
+ UnifiedLog.UNKNOWN_OFFSET,
+ UnifiedLog.UNKNOWN_OFFSET,
+ -1L,
+ OptionalLong.empty(),
+ error);
}
public FetchPartitionData toFetchPartitionData(boolean
isReassignmentFetch) {
diff --git
a/storage/src/test/java/org/apache/kafka/server/purgatory/DelayedRemoteFetchTest.java
b/storage/src/test/java/org/apache/kafka/server/purgatory/DelayedRemoteFetchTest.java
new file mode 100644
index 00000000000..d4fc0c3ef27
--- /dev/null
+++
b/storage/src/test/java/org/apache/kafka/server/purgatory/DelayedRemoteFetchTest.java
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.purgatory;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.server.metrics.KafkaYammerMetrics;
+import org.apache.kafka.server.storage.log.FetchIsolation;
+import org.apache.kafka.server.storage.log.FetchParams;
+import org.apache.kafka.server.storage.log.FetchPartitionData;
+import org.apache.kafka.storage.internals.log.FetchDataInfo;
+import org.apache.kafka.storage.internals.log.FetchPartitionStatus;
+import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
+import org.apache.kafka.storage.internals.log.LogReadResult;
+import org.apache.kafka.storage.internals.log.RemoteLogReadResult;
+import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;
+
+import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.core.Metric;
+import com.yammer.metrics.core.MetricName;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class DelayedRemoteFetchTest {
+ private final int maxBytes = 1024;
+ private final Consumer<TopicPartition> partitionOrException =
mock(Consumer.class);
+ private final TopicIdPartition topicIdPartition = new
TopicIdPartition(Uuid.randomUuid(), 0, "topic");
+ private final TopicIdPartition topicIdPartition2 = new
TopicIdPartition(Uuid.randomUuid(), 0, "topic2");
+ private final long fetchOffset = 500L;
+ private final long logStartOffset = 0L;
+ private final Optional<Integer> currentLeaderEpoch = Optional.of(10);
+ private final int remoteFetchMaxWaitMs = 500;
+
+ private final FetchPartitionStatus fetchStatus = new FetchPartitionStatus(
+ new LogOffsetMetadata(fetchOffset),
+ new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset,
logStartOffset, maxBytes, currentLeaderEpoch)
+ );
+ private final FetchParams fetchParams = buildFetchParams(-1, 500);
+
+ @Test
+ public void testFetch() {
+ AtomicReference<TopicIdPartition> actualTopicPartition = new
AtomicReference<>();
+ AtomicReference<FetchPartitionData> fetchResultOpt = new
AtomicReference<>();
+
+ Consumer<Map<TopicIdPartition, FetchPartitionData>> callback =
responses -> {
+ assertEquals(1, responses.size());
+ Map.Entry<TopicIdPartition, FetchPartitionData> entry =
responses.entrySet().iterator().next();
+ actualTopicPartition.set(entry.getKey());
+ fetchResultOpt.set(entry.getValue());
+ };
+
+ CompletableFuture<RemoteLogReadResult> future = new
CompletableFuture<>();
+ future.complete(buildRemoteReadResult(Errors.NONE));
+
+ RemoteStorageFetchInfo fetchInfo = new RemoteStorageFetchInfo(0,
false, topicIdPartition, null, null);
+ long highWatermark = 100L;
+ long leaderLogStartOffset = 10L;
+ LogReadResult logReadInfo = buildReadResult(Errors.NONE,
highWatermark, leaderLogStartOffset);
+
+ DelayedRemoteFetch delayedRemoteFetch = new DelayedRemoteFetch(
+ Map.of(),
+ Map.of(topicIdPartition, future),
+ Map.of(topicIdPartition, fetchInfo),
+ remoteFetchMaxWaitMs,
+ Map.of(topicIdPartition, fetchStatus),
+ fetchParams,
+ Map.of(topicIdPartition, logReadInfo),
+ partitionOrException,
+ callback
+ );
+
+ assertTrue(delayedRemoteFetch.tryComplete());
+ assertTrue(delayedRemoteFetch.isCompleted());
+ assertNotNull(actualTopicPartition.get());
+ assertEquals(topicIdPartition, actualTopicPartition.get());
+ assertNotNull(fetchResultOpt.get());
+
+ FetchPartitionData fetchResult = fetchResultOpt.get();
+ assertEquals(Errors.NONE, fetchResult.error);
+ assertEquals(highWatermark, fetchResult.highWatermark);
+ assertEquals(leaderLogStartOffset, fetchResult.logStartOffset);
+ }
+
+ @Test
+ public void testFollowerFetch() {
+ Consumer<Map<TopicIdPartition, FetchPartitionData>> callback =
responses -> {
+ assertEquals(1, responses.size());
+ };
+
+ CompletableFuture<RemoteLogReadResult> future = new
CompletableFuture<>();
+ future.complete(buildRemoteReadResult(Errors.NONE));
+ RemoteStorageFetchInfo fetchInfo = new RemoteStorageFetchInfo(0,
false, topicIdPartition, null, null);
+ LogReadResult logReadInfo = buildReadResult(Errors.NONE, 100L, 10L);
+
+ assertThrows(IllegalStateException.class, () ->
+ new DelayedRemoteFetch(
+ Map.of(),
+ Map.of(topicIdPartition, future),
+ Map.of(topicIdPartition, fetchInfo),
+ remoteFetchMaxWaitMs,
+ Map.of(topicIdPartition, fetchStatus),
+ buildFetchParams(1, 500),
+ Map.of(topicIdPartition, logReadInfo),
+ partitionOrException,
+ callback
+ ));
+ }
+
+ @Test
+ public void testNotLeaderOrFollower() {
+ AtomicReference<TopicIdPartition> actualTopicPartition = new
AtomicReference<>();
+ AtomicReference<FetchPartitionData> fetchResultOpt = new
AtomicReference<>();
+
+ Consumer<Map<TopicIdPartition, FetchPartitionData>> callback =
responses -> {
+ assertEquals(1, responses.size());
+ Map.Entry<TopicIdPartition, FetchPartitionData> entry =
responses.entrySet().iterator().next();
+ actualTopicPartition.set(entry.getKey());
+ fetchResultOpt.set(entry.getValue());
+ };
+
+ // throw exception while getPartition
+ doThrow(new NotLeaderOrFollowerException(String.format("Replica for %s
not available", topicIdPartition)))
+
.when(partitionOrException).accept(topicIdPartition.topicPartition());
+
+ CompletableFuture<RemoteLogReadResult> future = new
CompletableFuture<>();
+ RemoteStorageFetchInfo fetchInfo = new RemoteStorageFetchInfo(0,
false, topicIdPartition, null, null);
+
+ LogReadResult logReadInfo = buildReadResult(Errors.NONE);
+
+ DelayedRemoteFetch delayedRemoteFetch = new DelayedRemoteFetch(
+ Map.of(),
+ Map.of(topicIdPartition, future),
+ Map.of(topicIdPartition, fetchInfo),
+ remoteFetchMaxWaitMs,
+ Map.of(topicIdPartition, fetchStatus),
+ fetchParams,
+ Map.of(topicIdPartition, logReadInfo),
+ partitionOrException,
+ callback
+ );
+
+ // delayed remote fetch should still be able to complete
+ assertTrue(delayedRemoteFetch.tryComplete());
+ assertTrue(delayedRemoteFetch.isCompleted());
+ assertEquals(topicIdPartition, actualTopicPartition.get());
+ assertNotNull(fetchResultOpt.get());
+ }
+
+ @Test
+ public void testErrorLogReadInfo() {
+ AtomicReference<TopicIdPartition> actualTopicPartition = new
AtomicReference<>();
+ AtomicReference<FetchPartitionData> fetchResultOpt = new
AtomicReference<>();
+
+ Consumer<Map<TopicIdPartition, FetchPartitionData>> callback =
responses -> {
+ assertEquals(1, responses.size());
+ Map.Entry<TopicIdPartition, FetchPartitionData> entry =
responses.entrySet().iterator().next();
+ actualTopicPartition.set(entry.getKey());
+ fetchResultOpt.set(entry.getValue());
+ };
+
+ CompletableFuture<RemoteLogReadResult> future = new
CompletableFuture<>();
+ future.complete(buildRemoteReadResult(Errors.NONE));
+
+ RemoteStorageFetchInfo fetchInfo = new RemoteStorageFetchInfo(0,
false, topicIdPartition, null, null);
+
+ // build a read result with error
+ LogReadResult logReadInfo =
buildReadResult(Errors.FENCED_LEADER_EPOCH);
+
+ DelayedRemoteFetch delayedRemoteFetch = new DelayedRemoteFetch(
+ Map.of(),
+ Map.of(topicIdPartition, future),
+ Map.of(topicIdPartition, fetchInfo),
+ remoteFetchMaxWaitMs,
+ Map.of(topicIdPartition, fetchStatus),
+ fetchParams,
+ Map.of(topicIdPartition, logReadInfo),
+ partitionOrException,
+ callback
+ );
+
+ assertTrue(delayedRemoteFetch.tryComplete());
+ assertTrue(delayedRemoteFetch.isCompleted());
+ assertEquals(topicIdPartition, actualTopicPartition.get());
+ assertNotNull(fetchResultOpt.get());
+ assertEquals(Errors.FENCED_LEADER_EPOCH, fetchResultOpt.get().error);
+ }
+
+ private long expiresPerSecValue() {
+ Map<MetricName, Metric> allMetrics =
KafkaYammerMetrics.defaultRegistry().allMetrics();
+ return allMetrics.entrySet()
+ .stream()
+ .filter(e ->
e.getKey().getMBeanName().endsWith("kafka.server:type=DelayedRemoteFetchMetrics,name=ExpiresPerSec"))
+ .findFirst()
+ .map(Map.Entry::getValue)
+ .filter(Meter.class::isInstance)
+ .map(Meter.class::cast)
+ .map(Meter::count)
+ .orElse(0L);
+ }
+
+ @Test
+ public void testRequestExpiry() {
+ Map<TopicIdPartition, FetchPartitionData> responses = new HashMap<>();
+
+ Consumer<Map<TopicIdPartition, FetchPartitionData>> callback =
responses::putAll;
+
+ Future<Void> remoteFetchTaskExpired = mock(Future.class);
+ Future<Void> remoteFetchTask2 = mock(Future.class);
+ // complete the 2nd task, and keep the 1st one expired
+ when(remoteFetchTask2.isDone()).thenReturn(true);
+
+ // Create futures - one completed, one not
+ CompletableFuture<RemoteLogReadResult> future1 = new
CompletableFuture<>();
+ CompletableFuture<RemoteLogReadResult> future2 = new
CompletableFuture<>();
+ // Only complete one remote fetch
+ future2.complete(buildRemoteReadResult(Errors.NONE));
+
+ RemoteStorageFetchInfo fetchInfo1 = new RemoteStorageFetchInfo(0,
false, topicIdPartition, null, null);
+ RemoteStorageFetchInfo fetchInfo2 = new RemoteStorageFetchInfo(0,
false, topicIdPartition2, null, null);
+
+ long highWatermark = 100L;
+ long leaderLogStartOffset = 10L;
+
+ LogReadResult logReadInfo1 = buildReadResult(Errors.NONE,
highWatermark, leaderLogStartOffset);
+ LogReadResult logReadInfo2 = buildReadResult(Errors.NONE);
+
+ FetchPartitionStatus fetchStatus1 = new FetchPartitionStatus(
+ new LogOffsetMetadata(fetchOffset),
+ new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset,
logStartOffset, maxBytes, currentLeaderEpoch));
+
+ FetchPartitionStatus fetchStatus2 = new FetchPartitionStatus(
+ new LogOffsetMetadata(fetchOffset + 100L),
+ new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset + 100L,
logStartOffset, maxBytes, currentLeaderEpoch));
+
+ // Set up maps for multiple partitions
+ Map<TopicIdPartition, Future<Void>> remoteFetchTasks =
Map.of(topicIdPartition, remoteFetchTaskExpired, topicIdPartition2,
remoteFetchTask2);
+ Map<TopicIdPartition, CompletableFuture<RemoteLogReadResult>>
remoteFetchResults = Map.of(topicIdPartition, future1, topicIdPartition2,
future2);
+ Map<TopicIdPartition, RemoteStorageFetchInfo> remoteFetchInfos =
Map.of(topicIdPartition, fetchInfo1, topicIdPartition2, fetchInfo2);
+
+ DelayedRemoteFetch delayedRemoteFetch = new DelayedRemoteFetch(
+ remoteFetchTasks,
+ remoteFetchResults,
+ remoteFetchInfos,
+ remoteFetchMaxWaitMs,
+ Map.of(topicIdPartition, fetchStatus1, topicIdPartition2,
fetchStatus2),
+ fetchParams,
+ Map.of(topicIdPartition, logReadInfo1, topicIdPartition2,
logReadInfo2),
+ partitionOrException,
+ callback
+ );
+
+ // Verify that the ExpiresPerSec metric is zero before fetching
+ long existingMetricVal = expiresPerSecValue();
+ // Verify the delayedRemoteFetch is not completed yet
+ assertFalse(delayedRemoteFetch.isCompleted());
+
+ // Force the delayed remote fetch to expire
+ delayedRemoteFetch.run();
+
+ // Check that the expired task was cancelled and force-completed
+ verify(remoteFetchTaskExpired).cancel(anyBoolean());
+ verify(remoteFetchTask2, never()).cancel(anyBoolean());
+ assertTrue(delayedRemoteFetch.isCompleted());
+
+ // Check that the ExpiresPerSec metric was incremented
+ assertTrue(expiresPerSecValue() > existingMetricVal);
+
+ // Fetch results should include 2 results and the expired one should
return local read results
+ assertEquals(2, responses.size());
+ assertTrue(responses.containsKey(topicIdPartition));
+ assertTrue(responses.containsKey(topicIdPartition2));
+
+ assertEquals(Errors.NONE, responses.get(topicIdPartition).error);
+ assertEquals(highWatermark,
responses.get(topicIdPartition).highWatermark);
+ assertEquals(leaderLogStartOffset,
responses.get(topicIdPartition).logStartOffset);
+
+ assertEquals(Errors.NONE, responses.get(topicIdPartition2).error);
+ }
+
+ @Test
+ public void testMultiplePartitions() {
+ Map<TopicIdPartition, FetchPartitionData> responses = new HashMap<>();
+
+ Consumer<Map<TopicIdPartition, FetchPartitionData>> callback =
responses::putAll;
+
+ // Create futures - one completed, one not
+ CompletableFuture<RemoteLogReadResult> future1 = new
CompletableFuture<>();
+ CompletableFuture<RemoteLogReadResult> future2 = new
CompletableFuture<>();
+ // Only complete one remote fetch
+ future1.complete(buildRemoteReadResult(Errors.NONE));
+
+ RemoteStorageFetchInfo fetchInfo1 = new RemoteStorageFetchInfo(0,
false, topicIdPartition, null, null);
+ RemoteStorageFetchInfo fetchInfo2 = new RemoteStorageFetchInfo(0,
false, topicIdPartition, null, null);
+
+ long highWatermark1 = 100L;
+ long leaderLogStartOffset1 = 10L;
+ long highWatermark2 = 200L;
+ long leaderLogStartOffset2 = 20L;
+
+ LogReadResult logReadInfo1 = buildReadResult(Errors.NONE,
highWatermark1, leaderLogStartOffset1);
+ LogReadResult logReadInfo2 = buildReadResult(Errors.NONE,
highWatermark2, leaderLogStartOffset2);
+
+ FetchPartitionStatus fetchStatus1 = new FetchPartitionStatus(
+ new LogOffsetMetadata(fetchOffset),
+ new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset,
logStartOffset, maxBytes, currentLeaderEpoch));
+
+ FetchPartitionStatus fetchStatus2 = new FetchPartitionStatus(
+ new LogOffsetMetadata(fetchOffset + 100L),
+ new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset + 100L,
logStartOffset, maxBytes, currentLeaderEpoch));
+
+ DelayedRemoteFetch delayedRemoteFetch = new DelayedRemoteFetch(
+ Map.of(),
+ Map.of(topicIdPartition, future1, topicIdPartition2, future2),
+ Map.of(topicIdPartition, fetchInfo1, topicIdPartition2,
fetchInfo2),
+ remoteFetchMaxWaitMs,
+ Map.of(topicIdPartition, fetchStatus1, topicIdPartition2,
fetchStatus2),
+ fetchParams,
+ Map.of(topicIdPartition, logReadInfo1, topicIdPartition2,
logReadInfo2),
+ partitionOrException,
+ callback
+ );
+
+ // Should not complete since future2 is not done
+ assertFalse(delayedRemoteFetch.tryComplete());
+ assertFalse(delayedRemoteFetch.isCompleted());
+
+ // Complete future2
+ future2.complete(buildRemoteReadResult(Errors.NONE));
+
+ // Now it should complete
+ assertTrue(delayedRemoteFetch.tryComplete());
+ assertTrue(delayedRemoteFetch.isCompleted());
+
+ // Verify both partitions were processed without error
+ assertEquals(2, responses.size());
+ assertTrue(responses.containsKey(topicIdPartition));
+ assertTrue(responses.containsKey(topicIdPartition2));
+
+ assertEquals(Errors.NONE, responses.get(topicIdPartition).error);
+ assertEquals(highWatermark1,
responses.get(topicIdPartition).highWatermark);
+ assertEquals(leaderLogStartOffset1,
responses.get(topicIdPartition).logStartOffset);
+
+ assertEquals(Errors.NONE, responses.get(topicIdPartition2).error);
+ assertEquals(highWatermark2,
responses.get(topicIdPartition2).highWatermark);
+ assertEquals(leaderLogStartOffset2,
responses.get(topicIdPartition2).logStartOffset);
+ }
+
+ @Test
+ public void testMultiplePartitionsWithFailedResults() {
+ Map<TopicIdPartition, FetchPartitionData> responses = new HashMap<>();
+
+ Consumer<Map<TopicIdPartition, FetchPartitionData>> callback =
responses::putAll;
+
+ // Create futures - one successful, one with error
+ CompletableFuture<RemoteLogReadResult> future1 = new
CompletableFuture<>();
+ CompletableFuture<RemoteLogReadResult> future2 = new
CompletableFuture<>();
+
+ // Created 1 successful result and 1 failed result
+ future1.complete(buildRemoteReadResult(Errors.NONE));
+ future2.complete(buildRemoteReadResult(Errors.UNKNOWN_SERVER_ERROR));
+
+ RemoteStorageFetchInfo fetchInfo1 = new RemoteStorageFetchInfo(0,
false, topicIdPartition, null, null);
+ RemoteStorageFetchInfo fetchInfo2 = new RemoteStorageFetchInfo(0,
false, topicIdPartition, null, null);
+
+ LogReadResult logReadInfo1 = buildReadResult(Errors.NONE, 100, 10);
+ LogReadResult logReadInfo2 = buildReadResult(Errors.NONE, 100, 10);
+
+ FetchPartitionStatus fetchStatus1 = new FetchPartitionStatus(
+ new LogOffsetMetadata(fetchOffset),
+ new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset,
logStartOffset, maxBytes, currentLeaderEpoch));
+
+ FetchPartitionStatus fetchStatus2 = new FetchPartitionStatus(
+ new LogOffsetMetadata(fetchOffset + 100),
+ new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset + 100,
logStartOffset, maxBytes, currentLeaderEpoch));
+
+ DelayedRemoteFetch delayedRemoteFetch = new DelayedRemoteFetch(
+ Map.of(),
+ Map.of(topicIdPartition, future1, topicIdPartition2, future2),
+ Map.of(topicIdPartition, fetchInfo1, topicIdPartition2,
fetchInfo2),
+ remoteFetchMaxWaitMs,
+ Map.of(topicIdPartition, fetchStatus1, topicIdPartition2,
fetchStatus2),
+ fetchParams,
+ Map.of(topicIdPartition, logReadInfo1, topicIdPartition2,
logReadInfo2),
+ partitionOrException,
+ callback
+ );
+
+ assertTrue(delayedRemoteFetch.tryComplete());
+ assertTrue(delayedRemoteFetch.isCompleted());
+
+ // Verify both partitions were processed
+ assertEquals(2, responses.size());
+ assertTrue(responses.containsKey(topicIdPartition));
+ assertTrue(responses.containsKey(topicIdPartition2));
+
+ // First partition should be successful
+ FetchPartitionData fetchResult1 = responses.get(topicIdPartition);
+ assertEquals(Errors.NONE, fetchResult1.error);
+
+ // Second partition should have an error due to remote fetch failure
+ FetchPartitionData fetchResult2 = responses.get(topicIdPartition2);
+ assertEquals(Errors.UNKNOWN_SERVER_ERROR, fetchResult2.error);
+ }
+
+ private FetchParams buildFetchParams(int replicaId, int maxWaitMs) {
+ return new FetchParams(
+ replicaId,
+ 1,
+ maxWaitMs,
+ 1,
+ maxBytes,
+ FetchIsolation.LOG_END,
+ Optional.empty()
+ );
+ }
+
+ private LogReadResult buildReadResult(Errors error) {
+ return buildReadResult(error, 0, 0);
+ }
+
+ private LogReadResult buildReadResult(Errors error, long highWatermark,
long leaderLogStartOffset) {
+ return new LogReadResult(
+ new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA,
MemoryRecords.EMPTY, false, Optional.empty(),
+ Optional.of(mock(RemoteStorageFetchInfo.class))),
+ Optional.empty(),
+ highWatermark,
+ leaderLogStartOffset,
+ -1L,
+ -1L,
+ -1L,
+ OptionalLong.empty(),
+ error);
+ }
+
+ private RemoteLogReadResult buildRemoteReadResult(Errors error) {
+ return new RemoteLogReadResult(
+ Optional.of(new
FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY)),
+ error != Errors.NONE ? Optional.of(error.exception()) :
Optional.empty());
+ }
+}