This is an automated email from the ASF dual-hosted git repository.

clolov pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7591868aead KAFKA-18179: Move AsyncOffsetReadFutureHolder to storage 
module (#18095)
7591868aead is described below

commit 7591868aead54fff7d5e8a44c5e06746ed34866b
Author: Mickael Maison <[email protected]>
AuthorDate: Wed Dec 11 10:56:47 2024 +0100

    KAFKA-18179: Move AsyncOffsetReadFutureHolder to storage module (#18095)
    
    Reviewers: Christo Lolov <[email protected]>
---
 .../java/kafka/log/remote/RemoteLogManager.java    |  2 +-
 .../main/scala/kafka/log/OffsetResultHolder.scala  | 12 +--------
 .../kafka/server/ListOffsetsPartitionStatus.scala  |  2 +-
 .../log/remote/RemoteLogOffsetReaderTest.java      |  2 +-
 .../server/DelayedRemoteListOffsetsTest.scala      |  2 +-
 .../internals/log/AsyncOffsetReadFutureHolder.java | 30 ++++++++++++++++++++++
 6 files changed, 35 insertions(+), 15 deletions(-)

diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java 
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index 6e95e36020b..b2b1ab856c0 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -17,7 +17,6 @@
 package kafka.log.remote;
 
 import kafka.cluster.Partition;
-import kafka.log.AsyncOffsetReadFutureHolder;
 import kafka.log.UnifiedLog;
 import kafka.server.DelayedRemoteListOffsets;
 
@@ -74,6 +73,7 @@ import org.apache.kafka.server.storage.log.FetchIsolation;
 import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
 import org.apache.kafka.storage.internals.log.AbortedTxn;
+import org.apache.kafka.storage.internals.log.AsyncOffsetReadFutureHolder;
 import org.apache.kafka.storage.internals.log.EpochEntry;
 import org.apache.kafka.storage.internals.log.FetchDataInfo;
 import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
diff --git a/core/src/main/scala/kafka/log/OffsetResultHolder.scala 
b/core/src/main/scala/kafka/log/OffsetResultHolder.scala
index 64b78c6cee9..89951dbb96f 100644
--- a/core/src/main/scala/kafka/log/OffsetResultHolder.scala
+++ b/core/src/main/scala/kafka/log/OffsetResultHolder.scala
@@ -18,8 +18,7 @@ package kafka.log
 
 import org.apache.kafka.common.errors.ApiException
 import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
-
-import java.util.concurrent.{CompletableFuture, Future}
+import org.apache.kafka.storage.internals.log.AsyncOffsetReadFutureHolder
 
 case class OffsetResultHolder(timestampAndOffsetOpt: 
Option[TimestampAndOffset],
                               futureHolderOpt: 
Option[AsyncOffsetReadFutureHolder[Either[Exception, 
Option[TimestampAndOffset]]]] = None) {
@@ -27,12 +26,3 @@ case class OffsetResultHolder(timestampAndOffsetOpt: 
Option[TimestampAndOffset],
   var maybeOffsetsError: Option[ApiException] = None
   var lastFetchableOffset: Option[Long] = None
 }
-
-/**
- * A remote log offset read task future holder. It contains two futures:
- * 1. JobFuture - Use this future to cancel the running job.
- * 2. TaskFuture - Use this future to get the result of the job/computation.
- */
-case class AsyncOffsetReadFutureHolder[T](jobFuture: Future[Void], taskFuture: 
CompletableFuture[T]) {
-
-}
diff --git a/core/src/main/scala/kafka/server/ListOffsetsPartitionStatus.scala 
b/core/src/main/scala/kafka/server/ListOffsetsPartitionStatus.scala
index 702d0a4ccb8..d9fb9e6d059 100644
--- a/core/src/main/scala/kafka/server/ListOffsetsPartitionStatus.scala
+++ b/core/src/main/scala/kafka/server/ListOffsetsPartitionStatus.scala
@@ -16,10 +16,10 @@
  */
 package kafka.server
 
-import kafka.log.AsyncOffsetReadFutureHolder
 import org.apache.kafka.common.errors.ApiException
 import 
org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse
 import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
+import org.apache.kafka.storage.internals.log.AsyncOffsetReadFutureHolder
 
 class ListOffsetsPartitionStatus(val futureHolderOpt: 
Option[AsyncOffsetReadFutureHolder[Either[Exception, 
Option[TimestampAndOffset]]]],
                                  val lastFetchableOffset: Option[Long],
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogOffsetReaderTest.java 
b/core/src/test/java/kafka/log/remote/RemoteLogOffsetReaderTest.java
index ce027f8f915..1313c8e2898 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogOffsetReaderTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogOffsetReaderTest.java
@@ -16,7 +16,6 @@
  */
 package kafka.log.remote;
 
-import kafka.log.AsyncOffsetReadFutureHolder;
 import kafka.utils.TestUtils;
 
 import org.apache.kafka.common.TopicPartition;
@@ -28,6 +27,7 @@ import 
org.apache.kafka.server.log.remote.storage.RemoteStorageException;
 import org.apache.kafka.server.util.MockTime;
 import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
+import org.apache.kafka.storage.internals.log.AsyncOffsetReadFutureHolder;
 import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
 
diff --git 
a/core/src/test/scala/integration/kafka/server/DelayedRemoteListOffsetsTest.scala
 
b/core/src/test/scala/integration/kafka/server/DelayedRemoteListOffsetsTest.scala
index 4061e6aaaf8..eaa45895959 100644
--- 
a/core/src/test/scala/integration/kafka/server/DelayedRemoteListOffsetsTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/DelayedRemoteListOffsetsTest.scala
@@ -16,7 +16,6 @@
  */
 package kafka.server
 
-import kafka.log.AsyncOffsetReadFutureHolder
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.NotLeaderOrFollowerException
 import 
org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse
@@ -25,6 +24,7 @@ import 
org.apache.kafka.common.record.FileRecords.TimestampAndOffset
 import org.apache.kafka.common.requests.ListOffsetsResponse
 import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, 
TopicPartitionOperationKey}
 import org.apache.kafka.server.util.timer.MockTimer
+import org.apache.kafka.storage.internals.log.AsyncOffsetReadFutureHolder
 import org.junit.jupiter.api.{AfterEach, Test}
 import org.junit.jupiter.api.Assertions.assertEquals
 import org.mockito.ArgumentMatchers.anyBoolean
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/AsyncOffsetReadFutureHolder.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/AsyncOffsetReadFutureHolder.java
new file mode 100644
index 00000000000..990a5ef67dd
--- /dev/null
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/AsyncOffsetReadFutureHolder.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+
+/**
+ * A remote log offset read task future holder. It contains two futures:
+ * <ol>
+ *   <li>JobFuture - Use this future to cancel the running job.
+ *   <li>TaskFuture - Use this future to get the result of the job/computation.
+ * </ol>
+ */
+public record AsyncOffsetReadFutureHolder<T>(Future<Void> jobFuture, 
CompletableFuture<T> taskFuture) {
+}

Reply via email to