This is an automated email from the ASF dual-hosted git repository.
clolov pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7591868aead KAFKA-18179: Move AsyncOffsetReadFutureHolder to storage
module (#18095)
7591868aead is described below
commit 7591868aead54fff7d5e8a44c5e06746ed34866b
Author: Mickael Maison <[email protected]>
AuthorDate: Wed Dec 11 10:56:47 2024 +0100
KAFKA-18179: Move AsyncOffsetReadFutureHolder to storage module (#18095)
Reviewers: Christo Lolov <[email protected]>
---
.../java/kafka/log/remote/RemoteLogManager.java | 2 +-
.../main/scala/kafka/log/OffsetResultHolder.scala | 12 +--------
.../kafka/server/ListOffsetsPartitionStatus.scala | 2 +-
.../log/remote/RemoteLogOffsetReaderTest.java | 2 +-
.../server/DelayedRemoteListOffsetsTest.scala | 2 +-
.../internals/log/AsyncOffsetReadFutureHolder.java | 30 ++++++++++++++++++++++
6 files changed, 35 insertions(+), 15 deletions(-)
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index 6e95e36020b..b2b1ab856c0 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -17,7 +17,6 @@
package kafka.log.remote;
import kafka.cluster.Partition;
-import kafka.log.AsyncOffsetReadFutureHolder;
import kafka.log.UnifiedLog;
import kafka.server.DelayedRemoteListOffsets;
@@ -74,6 +73,7 @@ import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
import org.apache.kafka.storage.internals.log.AbortedTxn;
+import org.apache.kafka.storage.internals.log.AsyncOffsetReadFutureHolder;
import org.apache.kafka.storage.internals.log.EpochEntry;
import org.apache.kafka.storage.internals.log.FetchDataInfo;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
diff --git a/core/src/main/scala/kafka/log/OffsetResultHolder.scala
b/core/src/main/scala/kafka/log/OffsetResultHolder.scala
index 64b78c6cee9..89951dbb96f 100644
--- a/core/src/main/scala/kafka/log/OffsetResultHolder.scala
+++ b/core/src/main/scala/kafka/log/OffsetResultHolder.scala
@@ -18,8 +18,7 @@ package kafka.log
import org.apache.kafka.common.errors.ApiException
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
-
-import java.util.concurrent.{CompletableFuture, Future}
+import org.apache.kafka.storage.internals.log.AsyncOffsetReadFutureHolder
case class OffsetResultHolder(timestampAndOffsetOpt:
Option[TimestampAndOffset],
futureHolderOpt:
Option[AsyncOffsetReadFutureHolder[Either[Exception,
Option[TimestampAndOffset]]]] = None) {
@@ -27,12 +26,3 @@ case class OffsetResultHolder(timestampAndOffsetOpt:
Option[TimestampAndOffset],
var maybeOffsetsError: Option[ApiException] = None
var lastFetchableOffset: Option[Long] = None
}
-
-/**
- * A remote log offset read task future holder. It contains two futures:
- * 1. JobFuture - Use this future to cancel the running job.
- * 2. TaskFuture - Use this future to get the result of the job/computation.
- */
-case class AsyncOffsetReadFutureHolder[T](jobFuture: Future[Void], taskFuture:
CompletableFuture[T]) {
-
-}
diff --git a/core/src/main/scala/kafka/server/ListOffsetsPartitionStatus.scala
b/core/src/main/scala/kafka/server/ListOffsetsPartitionStatus.scala
index 702d0a4ccb8..d9fb9e6d059 100644
--- a/core/src/main/scala/kafka/server/ListOffsetsPartitionStatus.scala
+++ b/core/src/main/scala/kafka/server/ListOffsetsPartitionStatus.scala
@@ -16,10 +16,10 @@
*/
package kafka.server
-import kafka.log.AsyncOffsetReadFutureHolder
import org.apache.kafka.common.errors.ApiException
import
org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
+import org.apache.kafka.storage.internals.log.AsyncOffsetReadFutureHolder
class ListOffsetsPartitionStatus(val futureHolderOpt:
Option[AsyncOffsetReadFutureHolder[Either[Exception,
Option[TimestampAndOffset]]]],
val lastFetchableOffset: Option[Long],
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogOffsetReaderTest.java
b/core/src/test/java/kafka/log/remote/RemoteLogOffsetReaderTest.java
index ce027f8f915..1313c8e2898 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogOffsetReaderTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogOffsetReaderTest.java
@@ -16,7 +16,6 @@
*/
package kafka.log.remote;
-import kafka.log.AsyncOffsetReadFutureHolder;
import kafka.utils.TestUtils;
import org.apache.kafka.common.TopicPartition;
@@ -28,6 +27,7 @@ import
org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
+import org.apache.kafka.storage.internals.log.AsyncOffsetReadFutureHolder;
import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
diff --git
a/core/src/test/scala/integration/kafka/server/DelayedRemoteListOffsetsTest.scala
b/core/src/test/scala/integration/kafka/server/DelayedRemoteListOffsetsTest.scala
index 4061e6aaaf8..eaa45895959 100644
---
a/core/src/test/scala/integration/kafka/server/DelayedRemoteListOffsetsTest.scala
+++
b/core/src/test/scala/integration/kafka/server/DelayedRemoteListOffsetsTest.scala
@@ -16,7 +16,6 @@
*/
package kafka.server
-import kafka.log.AsyncOffsetReadFutureHolder
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.NotLeaderOrFollowerException
import
org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse
@@ -25,6 +24,7 @@ import
org.apache.kafka.common.record.FileRecords.TimestampAndOffset
import org.apache.kafka.common.requests.ListOffsetsResponse
import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory,
TopicPartitionOperationKey}
import org.apache.kafka.server.util.timer.MockTimer
+import org.apache.kafka.storage.internals.log.AsyncOffsetReadFutureHolder
import org.junit.jupiter.api.{AfterEach, Test}
import org.junit.jupiter.api.Assertions.assertEquals
import org.mockito.ArgumentMatchers.anyBoolean
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/AsyncOffsetReadFutureHolder.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/AsyncOffsetReadFutureHolder.java
new file mode 100644
index 00000000000..990a5ef67dd
--- /dev/null
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/AsyncOffsetReadFutureHolder.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+
+/**
+ * A remote log offset read task future holder. It contains two futures:
+ * <ol>
+ * <li>JobFuture - Use this future to cancel the running job.
+ * <li>TaskFuture - Use this future to get the result of the job/computation.
+ * </ol>
+ */
+public record AsyncOffsetReadFutureHolder<T>(Future<Void> jobFuture,
CompletableFuture<T> taskFuture) {
+}