This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 4e75c57bbb79014a0efd3d396630c810506c5921
Author: TengYao Chi <[email protected]>
AuthorDate: Sat Aug 3 20:15:51 2024 +0800

    KAFKA-17245: Revert TopicRecord changes. (#16780)
    
    Revert KAFKA-16257 changes because KIP-950 doesn't need it anymore.
    
    Reviewers: Luke Chen <[email protected]>
---
 .../java/kafka/log/remote/RemoteLogManager.java    |  3 +--
 .../kafka/log/remote/RemoteLogManagerTest.java     | 12 ++++++------
 .../kafka/admin/RemoteTopicCrudTest.scala          |  2 +-
 .../kafka/log/remote/RemoteIndexCacheTest.scala    |  8 ++++----
 .../resources/common/metadata/TopicRecord.json     | 10 ++--------
 .../kafka/metadata/util/RecordRedactorTest.java    |  2 +-
 .../remote/storage/RemoteLogSegmentMetadata.java   | 22 ++++------------------
 .../storage/RemoteLogSegmentMetadataTest.java      |  8 +++-----
 .../RemoteLogSegmentMetadataTransform.java         |  2 +-
 .../remote/metadata/storage/ConsumerTaskTest.java  |  2 +-
 .../storage/RemoteLogMetadataCacheTest.java        |  4 ++--
 .../storage/RemoteLogMetadataFormatterTest.java    |  2 +-
 .../storage/RemoteLogMetadataSerdeTest.java        |  2 +-
 .../storage/RemoteLogMetadataTransformTest.java    |  2 +-
 .../storage/RemoteLogSegmentLifecycleTest.java     |  6 +++---
 ...ogMetadataManagerMultipleSubscriptionsTest.java |  4 ++--
 ...icBasedRemoteLogMetadataManagerRestartTest.java |  6 +++---
 .../TopicBasedRemoteLogMetadataManagerTest.java    | 20 ++++++++++----------
 .../RemoteLogSegmentMetadataTransformTest.java     |  3 ++-
 .../log/remote/storage/LocalTieredStorageTest.java |  4 ++--
 .../storage/RemoteLogMetadataManagerTest.java      |  4 ++--
 21 files changed, 53 insertions(+), 75 deletions(-)

diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java 
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index f3f0ccc11ad..7a087497bb6 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -913,7 +913,6 @@ public class RemoteLogManager implements Closeable {
             logger.info("Copying {} to remote storage.", logFileName);
 
             long endOffset = nextSegmentBaseOffset - 1;
-            int tieredEpoch = 0;
             File producerStateSnapshotFile = 
log.producerStateManager().fetchSnapshot(nextSegmentBaseOffset).orElse(null);
 
             List<EpochEntry> epochEntries = getLeaderEpochEntries(log, 
segment.baseOffset(), nextSegmentBaseOffset);
@@ -922,7 +921,7 @@ public class RemoteLogManager implements Closeable {
 
             RemoteLogSegmentMetadata copySegmentStartedRlsm = new 
RemoteLogSegmentMetadata(segmentId, segment.baseOffset(), endOffset,
                     segment.largestTimestamp(), brokerId, time.milliseconds(), 
segment.log().sizeInBytes(),
-                    segmentLeaderEpochs, tieredEpoch);
+                    segmentLeaderEpochs);
 
             
remoteLogMetadataManager.addRemoteLogSegmentMetadata(copySegmentStartedRlsm).get();
 
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java 
b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index 4e5f70f9337..ce0939737b1 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -1566,7 +1566,7 @@ public class RemoteLogManagerTest {
                 100000L,
                 1000,
                 Optional.empty(),
-                RemoteLogSegmentState.COPY_SEGMENT_FINISHED, segmentEpochs, 0);
+                RemoteLogSegmentState.COPY_SEGMENT_FINISHED, segmentEpochs);
     }
 
     @Test
@@ -1906,9 +1906,9 @@ public class RemoteLogManagerTest {
         int segmentSize = 1024;
         List<RemoteLogSegmentMetadata> segmentMetadataList = Arrays.asList(
                 new RemoteLogSegmentMetadata(new 
RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()),
-                        500, 539, timestamp, brokerId, timestamp, segmentSize, 
truncateAndGetLeaderEpochs(epochEntries, 500L, 539L), 0),
+                        500, 539, timestamp, brokerId, timestamp, segmentSize, 
truncateAndGetLeaderEpochs(epochEntries, 500L, 539L)),
                 new RemoteLogSegmentMetadata(new 
RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()),
-                        540, 700, timestamp, brokerId, timestamp, segmentSize, 
truncateAndGetLeaderEpochs(epochEntries, 540L, 700L), 0)
+                        540, 700, timestamp, brokerId, timestamp, segmentSize, 
truncateAndGetLeaderEpochs(epochEntries, 540L, 700L))
                 );
         
when(remoteLogMetadataManager.listRemoteLogSegments(eq(leaderTopicIdPartition), 
anyInt()))
                 .thenAnswer(invocation -> {
@@ -2231,7 +2231,7 @@ public class RemoteLogManagerTest {
         RemoteLogSegmentMetadata metadata2 = new RemoteLogSegmentMetadata(new 
RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()),
                 metadata1.startOffset(), metadata1.endOffset() + 5, 
metadata1.maxTimestampMs(),
                 metadata1.brokerId() + 1, metadata1.eventTimestampMs(), 
metadata1.segmentSizeInBytes() + 128,
-                metadata1.customMetadata(), metadata1.state(), 
metadata1.segmentLeaderEpochs(), 0);
+                metadata1.customMetadata(), metadata1.state(), 
metadata1.segmentLeaderEpochs());
 
         // When there are overlapping/duplicate segments, the 
RemoteLogMetadataManager#listRemoteLogSegments
         // returns the segments in order of (valid ++ unreferenced) segments:
@@ -2657,8 +2657,8 @@ public class RemoteLogManagerTest {
                     segmentSize,
                     Optional.empty(),
                     state,
-                    segmentLeaderEpochs,
-                0);
+                    segmentLeaderEpochs
+            );
             segmentMetadataList.add(metadata);
         }
         return segmentMetadataList;
diff --git 
a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala 
b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
index e362832d638..f995b86b704 100644
--- a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
+++ b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
@@ -436,7 +436,7 @@ class MyRemoteLogMetadataManager extends 
NoOpRemoteLogMetadataManager {
       val startOffset = idx * recordsPerSegment
       val endOffset = startOffset + recordsPerSegment - 1
       val segmentLeaderEpochs: util.Map[Integer, java.lang.Long] = 
Collections.singletonMap(0, 0L)
-      segmentMetadataList.add(new RemoteLogSegmentMetadata(new 
RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()), startOffset, 
endOffset, timestamp, 0, timestamp, segmentSize, Optional.empty(), 
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, segmentLeaderEpochs, 0))
+      segmentMetadataList.add(new RemoteLogSegmentMetadata(new 
RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()), startOffset, 
endOffset, timestamp, 0, timestamp, segmentSize, Optional.empty(), 
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, segmentLeaderEpochs))
     }
     segmentMetadataList.iterator()
   }
diff --git 
a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala 
b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
index 95e28282f97..6ebcbeb4fcd 100644
--- a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
@@ -67,7 +67,7 @@ class RemoteIndexCacheTest {
     Files.createDirectory(tpDir.toPath)
 
     val remoteLogSegmentId = RemoteLogSegmentId.generateNew(idPartition)
-    rlsMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, baseOffset, 
lastOffset, time.milliseconds(), brokerId, time.milliseconds(), segmentSize, 
Collections.singletonMap(0, 0L), 0)
+    rlsMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, baseOffset, 
lastOffset, time.milliseconds(), brokerId, time.milliseconds(), segmentSize, 
Collections.singletonMap(0, 0L))
 
     cache = new RemoteIndexCache(defaultRemoteIndexCacheSizeBytes, rsm, 
tpDir.toString)
 
@@ -678,7 +678,7 @@ class RemoteIndexCacheTest {
   @Test
   def testConcurrentRemoveReadForCache(): Unit = {
     // Create a spy Cache Entry
-    val rlsMetadata = new 
RemoteLogSegmentMetadata(RemoteLogSegmentId.generateNew(idPartition), 
baseOffset, lastOffset, time.milliseconds(), brokerId, time.milliseconds(), 
segmentSize, Collections.singletonMap(0, 0L), 0)
+    val rlsMetadata = new 
RemoteLogSegmentMetadata(RemoteLogSegmentId.generateNew(idPartition), 
baseOffset, lastOffset, time.milliseconds(), brokerId, time.milliseconds(), 
segmentSize, Collections.singletonMap(0, 0L))
 
     val timeIndex = spy(createTimeIndexForSegmentMetadata(rlsMetadata, new 
File(tpDir, DIR_NAME)))
     val txIndex = spy(createTxIndexForSegmentMetadata(rlsMetadata, new 
File(tpDir, DIR_NAME)))
@@ -926,7 +926,7 @@ class RemoteIndexCacheTest {
 
   private def generateSpyCacheEntry(remoteLogSegmentId: RemoteLogSegmentId
                                     = 
RemoteLogSegmentId.generateNew(idPartition)): RemoteIndexCache.Entry = {
-    val rlsMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, 
baseOffset, lastOffset, time.milliseconds(), brokerId, time.milliseconds(), 
segmentSize, Collections.singletonMap(0, 0L), 0)
+    val rlsMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, 
baseOffset, lastOffset, time.milliseconds(), brokerId, time.milliseconds(), 
segmentSize, Collections.singletonMap(0, 0L))
     val timeIndex = spy(createTimeIndexForSegmentMetadata(rlsMetadata, tpDir))
     val txIndex = spy(createTxIndexForSegmentMetadata(rlsMetadata, tpDir))
     val offsetIndex = spy(createOffsetIndexForSegmentMetadata(rlsMetadata, 
tpDir))
@@ -993,7 +993,7 @@ class RemoteIndexCacheTest {
                                                tpId: TopicIdPartition): 
List[RemoteLogSegmentMetadata] = {
     val metadataList = mutable.Buffer.empty[RemoteLogSegmentMetadata]
     for (i <- 0 until size) {
-      metadataList.append(new RemoteLogSegmentMetadata(new 
RemoteLogSegmentId(tpId, Uuid.randomUuid()), baseOffset * i, baseOffset * i + 
10, time.milliseconds(), brokerId, time.milliseconds(), segmentSize, 
Collections.singletonMap(0, 0L), 0))
+      metadataList.append(new RemoteLogSegmentMetadata(new 
RemoteLogSegmentId(tpId, Uuid.randomUuid()), baseOffset * i, baseOffset * i + 
10, time.milliseconds(), brokerId, time.milliseconds(), segmentSize, 
Collections.singletonMap(0, 0L)))
     }
     metadataList.toList
   }
diff --git a/metadata/src/main/resources/common/metadata/TopicRecord.json 
b/metadata/src/main/resources/common/metadata/TopicRecord.json
index 5a316e6dbba..6fa5a05096d 100644
--- a/metadata/src/main/resources/common/metadata/TopicRecord.json
+++ b/metadata/src/main/resources/common/metadata/TopicRecord.json
@@ -17,18 +17,12 @@
   "apiKey": 2,
   "type": "metadata",
   "name": "TopicRecord",
-  // Version 0 first version of TopicRecord with Name and TopicId
-  // Version 1 adds TieredEpoch and TieredState for KIP-950
-  "validVersions": "0-1",
+  "validVersions": "0",
   "flexibleVersions": "0+",
   "fields": [
     { "name": "Name", "type": "string", "versions": "0+", "entityType": 
"topicName",
       "about": "The topic name." },
     { "name": "TopicId", "type": "uuid", "versions": "0+",
-      "about": "The unique ID of this topic." },
-    { "name": "TieredEpoch", "type": "int32", "versions": "1+",
-      "about": "The epoch denoting how many times the tiered state has 
changed" },
-    { "name": "TieredState", "type": "bool", "versions": "1+",
-      "about": "Denotes whether the topic is currently tiered or not" }
+      "about": "The unique ID of this topic." }
   ]
 }
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/util/RecordRedactorTest.java 
b/metadata/src/test/java/org/apache/kafka/metadata/util/RecordRedactorTest.java
index a5a18e582a0..0fbd4ef00ae 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metadata/util/RecordRedactorTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metadata/util/RecordRedactorTest.java
@@ -50,7 +50,7 @@ public final class RecordRedactorTest {
 
     @Test
     public void testTopicRecordToString() {
-        assertEquals("TopicRecord(name='foo', topicId=UOovKkohSU6AGdYW33ZUNg, 
tieredEpoch=0, tieredState=false)",
+        assertEquals("TopicRecord(name='foo', topicId=UOovKkohSU6AGdYW33ZUNg)",
                 REDACTOR.toLoggableString(new TopicRecord().
                     setTopicId(Uuid.fromString("UOovKkohSU6AGdYW33ZUNg")).
                     setName("foo")));
diff --git 
a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java
 
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java
index 4389a90ab8a..9b589322bbf 100644
--- 
a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java
+++ 
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java
@@ -78,11 +78,6 @@ public class RemoteLogSegmentMetadata extends 
RemoteLogMetadata {
      */
     private final RemoteLogSegmentState state;
 
-    /**
-     * The epoch denoting how many times the tiered state has changed
-     */
-    private final int tieredEpoch;
-
     /**
      * Creates an instance with the given metadata of remote log segment.
      * <p>
@@ -99,7 +94,6 @@ public class RemoteLogSegmentMetadata extends 
RemoteLogMetadata {
      * @param customMetadata      Custom metadata.
      * @param state               State of the respective segment of 
remoteLogSegmentId.
      * @param segmentLeaderEpochs leader epochs occurred within this segment.
-     * @param tieredEpoch         The epoch denoting how many times the tiered 
state has changed
      */
     public RemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId,
                                     long startOffset,
@@ -110,11 +104,10 @@ public class RemoteLogSegmentMetadata extends 
RemoteLogMetadata {
                                     int segmentSizeInBytes,
                                     Optional<CustomMetadata> customMetadata,
                                     RemoteLogSegmentState state,
-                                    Map<Integer, Long> segmentLeaderEpochs, 
int tieredEpoch) {
+                                    Map<Integer, Long> segmentLeaderEpochs) {
         super(brokerId, eventTimestampMs);
         this.remoteLogSegmentId = Objects.requireNonNull(remoteLogSegmentId, 
"remoteLogSegmentId can not be null");
         this.state = Objects.requireNonNull(state, "state can not be null");
-        this.tieredEpoch = tieredEpoch;
 
         if (startOffset < 0) {
             throw new IllegalArgumentException("Unexpected start offset = " + 
startOffset + ". StartOffset for a remote segment cannot be negative");
@@ -151,7 +144,6 @@ public class RemoteLogSegmentMetadata extends 
RemoteLogMetadata {
      * @param eventTimestampMs    Epoch time in milli seconds at which the 
remote log segment is copied to the remote tier storage.
      * @param segmentSizeInBytes  Size of this segment in bytes.
      * @param segmentLeaderEpochs leader epochs occurred within this segment
-     * @param tieredEpoch
      */
     public RemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId,
                                     long startOffset,
@@ -160,8 +152,7 @@ public class RemoteLogSegmentMetadata extends 
RemoteLogMetadata {
                                     int brokerId,
                                     long eventTimestampMs,
                                     int segmentSizeInBytes,
-                                    Map<Integer, Long> segmentLeaderEpochs,
-                                    int tieredEpoch) {
+                                    Map<Integer, Long> segmentLeaderEpochs) {
         this(remoteLogSegmentId,
                 startOffset,
                 endOffset,
@@ -170,8 +161,7 @@ public class RemoteLogSegmentMetadata extends 
RemoteLogMetadata {
                 eventTimestampMs, segmentSizeInBytes,
                 Optional.empty(),
                 RemoteLogSegmentState.COPY_SEGMENT_STARTED,
-                segmentLeaderEpochs,
-                tieredEpoch);
+                segmentLeaderEpochs);
     }
 
 
@@ -237,10 +227,6 @@ public class RemoteLogSegmentMetadata extends 
RemoteLogMetadata {
         return state;
     }
 
-    public int tieredEpoch() {
-        return tieredEpoch;
-    }
-
     /**
      * Creates a new RemoteLogSegmentMetadata applying the given {@code 
rlsmUpdate} on this instance. This method will
      * not update this instance.
@@ -255,7 +241,7 @@ public class RemoteLogSegmentMetadata extends 
RemoteLogMetadata {
 
         return new RemoteLogSegmentMetadata(remoteLogSegmentId, startOffset,
                 endOffset, maxTimestampMs, rlsmUpdate.brokerId(), 
rlsmUpdate.eventTimestampMs(),
-                segmentSizeInBytes, rlsmUpdate.customMetadata(), 
rlsmUpdate.state(), segmentLeaderEpochs, tieredEpoch);
+                segmentSizeInBytes, rlsmUpdate.customMetadata(), 
rlsmUpdate.state(), segmentLeaderEpochs);
     }
 
     @Override
diff --git 
a/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadataTest.java
 
b/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadataTest.java
index 4e11f57e39a..0aaaa4d99e4 100644
--- 
a/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadataTest.java
+++ 
b/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadataTest.java
@@ -43,14 +43,13 @@ class RemoteLogSegmentMetadataTest {
         long endOffset = 100L;
         int segmentSize = 123;
         long maxTimestamp = -1L;
-        int tieredEpoch = 0;
 
         Map<Integer, Long> segmentLeaderEpochs = new HashMap<>();
         segmentLeaderEpochs.put(0, 0L);
         RemoteLogSegmentId segmentId = new RemoteLogSegmentId(TP0, 
Uuid.randomUuid());
         RemoteLogSegmentMetadata segmentMetadata = new 
RemoteLogSegmentMetadata(segmentId, startOffset, endOffset,
                 maxTimestamp, brokerId, eventTimestamp, segmentSize,
-                segmentLeaderEpochs, tieredEpoch);
+                segmentLeaderEpochs);
 
         CustomMetadata customMetadata = new CustomMetadata(new byte[]{0, 1, 2, 
3});
         RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new 
RemoteLogSegmentMetadataUpdate(
@@ -62,8 +61,8 @@ class RemoteLogSegmentMetadataTest {
                 segmentId, startOffset, endOffset,
                 maxTimestamp, brokerIdFinished, timestampFinished, 
segmentSize, Optional.of(customMetadata),
                 RemoteLogSegmentState.COPY_SEGMENT_FINISHED,
-                segmentLeaderEpochs,
-                tieredEpoch);
+                segmentLeaderEpochs
+        );
         assertEquals(expectedUpdatedMetadata, updatedMetadata);
 
         // Check that the original metadata have not changed.
@@ -75,6 +74,5 @@ class RemoteLogSegmentMetadataTest {
         assertEquals(eventTimestamp, segmentMetadata.eventTimestampMs());
         assertEquals(segmentSize, segmentMetadata.segmentSizeInBytes());
         assertEquals(segmentLeaderEpochs, 
segmentMetadata.segmentLeaderEpochs());
-        assertEquals(tieredEpoch, segmentMetadata.tieredEpoch());
     }
 }
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java
index f97ca86b9ff..9e893d2cbc3 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java
@@ -83,7 +83,7 @@ public class RemoteLogSegmentMetadataTransform implements 
RemoteLogMetadataTrans
                 new RemoteLogSegmentMetadata(remoteLogSegmentId, 
record.startOffset(), record.endOffset(),
                                              record.maxTimestampMs(), 
record.brokerId(),
                                              record.eventTimestampMs(), 
record.segmentSizeInBytes(),
-                                             segmentLeaderEpochs, 0);
+                                             segmentLeaderEpochs);
         RemoteLogSegmentMetadataUpdate rlsmUpdate
                 = new RemoteLogSegmentMetadataUpdate(remoteLogSegmentId, 
record.eventTimestampMs(),
                                                      customMetadata,
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java
index c92efce68af..66176c68477 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java
@@ -425,7 +425,7 @@ public class ConsumerTaskTest {
                            final TopicIdPartition idPartition,
                            final long recordOffset) {
         final RemoteLogSegmentId segmentId = new 
RemoteLogSegmentId(idPartition, Uuid.randomUuid());
-        final RemoteLogMetadata metadata = new 
RemoteLogSegmentMetadata(segmentId, 0L, 1L, 0L, 0, 0L, 1, 
Collections.singletonMap(0, 0L), 0);
+        final RemoteLogMetadata metadata = new 
RemoteLogSegmentMetadata(segmentId, 0L, 1L, 0L, 0, 0L, 1, 
Collections.singletonMap(0, 0L));
         final ConsumerRecord<byte[], byte[]> record = new 
ConsumerRecord<>(TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME,
 metadataPartition, recordOffset, null, serde.serialize(metadata));
         consumer.addRecord(record);
     }
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCacheTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCacheTest.java
index 20918e0ca06..6b93f61dc7c 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCacheTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCacheTest.java
@@ -60,7 +60,7 @@ public class RemoteLogMetadataCacheTest {
             if (state != RemoteLogSegmentState.COPY_SEGMENT_STARTED) {
                 RemoteLogSegmentId segmentId = new RemoteLogSegmentId(tpId0, 
Uuid.randomUuid());
                 RemoteLogSegmentMetadata segmentMetadata = new 
RemoteLogSegmentMetadata(segmentId, 0, 100L,
-                        -1L, brokerId0, time.milliseconds(), segmentSize, 
Collections.singletonMap(0, 0L), 0);
+                        -1L, brokerId0, time.milliseconds(), segmentSize, 
Collections.singletonMap(0, 0L));
                 RemoteLogSegmentMetadata updatedMetadata = 
segmentMetadata.createWithUpdates(
                         new RemoteLogSegmentMetadataUpdate(segmentId, 
time.milliseconds(), Optional.empty(),
                                 state, brokerId1));
@@ -102,7 +102,7 @@ public class RemoteLogMetadataCacheTest {
         long offset = 10L;
         RemoteLogSegmentId segmentId = new RemoteLogSegmentId(tpId0, 
Uuid.randomUuid());
         RemoteLogSegmentMetadata segmentMetadata = new 
RemoteLogSegmentMetadata(segmentId, offset, 100L,
-                -1L, brokerId0, time.milliseconds(), segmentSize, 
Collections.singletonMap(leaderEpoch, offset), 0);
+                -1L, brokerId0, time.milliseconds(), segmentSize, 
Collections.singletonMap(leaderEpoch, offset));
         cache.addCopyInProgressSegment(segmentMetadata);
 
         // invalid-transition-1. COPY_SEGMENT_STARTED -> 
DELETE_SEGMENT_FINISHED
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java
index a4ac17cef23..d6a03441e8b 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java
@@ -53,7 +53,7 @@ public class RemoteLogMetadataFormatterTest {
         Optional<CustomMetadata> customMetadata = Optional.of(new 
CustomMetadata(new byte[10]));
         RemoteLogSegmentMetadata remoteLogMetadata = new 
RemoteLogSegmentMetadata(
                 remoteLogSegmentId, 0L, 100L, -1L, 1, 123L, 1024, 
customMetadata, COPY_SEGMENT_STARTED,
-                segLeaderEpochs, 0);
+                segLeaderEpochs);
 
         byte[] metadataBytes = new 
RemoteLogMetadataSerde().serialize(remoteLogMetadata);
         ConsumerRecord<byte[], byte[]> metadataRecord = new ConsumerRecord<>(
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java
index 6e2b8960da1..ba2c3d1f26e 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java
@@ -72,7 +72,7 @@ public class RemoteLogMetadataSerdeTest {
         RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, 
Uuid.randomUuid());
         return new RemoteLogSegmentMetadata(remoteLogSegmentId, 0L, 100L, -1L, 
1, 
                 time.milliseconds(), 1024, Optional.of(new CustomMetadata(new 
byte[] {0, 1, 2, 3})),
-                COPY_SEGMENT_STARTED, segLeaderEpochs, 0);
+                COPY_SEGMENT_STARTED, segLeaderEpochs);
     }
 
     private RemoteLogSegmentMetadataUpdate 
createRemoteLogSegmentMetadataUpdate() {
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java
index 41d82897033..e7131a81303 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java
@@ -68,7 +68,7 @@ public class RemoteLogMetadataTransformTest {
     private RemoteLogSegmentMetadata createRemoteLogSegmentMetadata() {
         RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, 
Uuid.randomUuid());
         return new RemoteLogSegmentMetadata(remoteLogSegmentId, 0L, 100L, -1L, 
1,
-                time.milliseconds(), 1024, Collections.singletonMap(0, 0L), 0);
+                time.milliseconds(), 1024, Collections.singletonMap(0, 0L));
     }
 
     @Test
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java
index 0a36ef1b559..0d91f69d4bb 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java
@@ -94,7 +94,7 @@ public class RemoteLogSegmentLifecycleTest {
             leaderEpochSegment0.put(2, 80L);
             RemoteLogSegmentId segmentId0 = new 
RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid());
             RemoteLogSegmentMetadata metadataSegment0 = new 
RemoteLogSegmentMetadata(segmentId0, 0L,
-                    100L, -1L, brokerId0, time.milliseconds(), segSize, 
leaderEpochSegment0, 0);
+                    100L, -1L, brokerId0, time.milliseconds(), segSize, 
leaderEpochSegment0);
             
metadataManager.addRemoteLogSegmentMetadata(metadataSegment0).get();
             
verify(spyRemotePartitionMetadataStore).handleRemoteLogSegmentMetadata(metadataSegment0);
 
@@ -223,7 +223,7 @@ public class RemoteLogSegmentLifecycleTest {
             throws RemoteStorageException, ExecutionException, 
InterruptedException {
         RemoteLogSegmentId segmentId = new 
RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid());
         RemoteLogSegmentMetadata segmentMetadata = new 
RemoteLogSegmentMetadata(segmentId, startOffset, endOffset,
-                -1L, brokerId0, time.milliseconds(), segSize, 
segmentLeaderEpochs, 0);
+                -1L, brokerId0, time.milliseconds(), segSize, 
segmentLeaderEpochs);
         metadataManager.addRemoteLogSegmentMetadata(segmentMetadata).get();
         
verify(spyRemotePartitionMetadataStore).handleRemoteLogSegmentMetadata(segmentMetadata);
 
@@ -258,7 +258,7 @@ public class RemoteLogSegmentLifecycleTest {
             // segments.
             RemoteLogSegmentId segmentId = new 
RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid());
             RemoteLogSegmentMetadata segmentMetadata = new 
RemoteLogSegmentMetadata(segmentId, 0L, 50L,
-                    -1L, brokerId0, time.milliseconds(), segSize, 
Collections.singletonMap(0, 0L), 0);
+                    -1L, brokerId0, time.milliseconds(), segSize, 
Collections.singletonMap(0, 0L));
             metadataManager.addRemoteLogSegmentMetadata(segmentMetadata).get();
             
verify(spyRemotePartitionMetadataStore).handleRemoteLogSegmentMetadata(segmentMetadata);
 
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java
index 55541d18a42..f64996ae46d 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java
@@ -127,7 +127,7 @@ public class 
TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest {
             int segSize = 1048576;
             RemoteLogSegmentMetadata leaderSegmentMetadata = new 
RemoteLogSegmentMetadata(new RemoteLogSegmentId(leaderTopicIdPartition, 
Uuid.randomUuid()),
                 0, 100, -1L, 0,
-                time.milliseconds(), segSize, Collections.singletonMap(0, 0L), 
0);
+                time.milliseconds(), segSize, Collections.singletonMap(0, 0L));
             ExecutionException exception = 
assertThrows(ExecutionException.class,
                     () -> 
remoteLogMetadataManager.addRemoteLogSegmentMetadata(leaderSegmentMetadata).get());
             assertEquals("org.apache.kafka.common.KafkaException: This 
consumer is not assigned to the target partition 0. Currently assigned 
partitions: []",
@@ -135,7 +135,7 @@ public class 
TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest {
 
             RemoteLogSegmentMetadata followerSegmentMetadata = new 
RemoteLogSegmentMetadata(new RemoteLogSegmentId(followerTopicIdPartition, 
Uuid.randomUuid()),
                 0, 100, -1L, 0,
-                time.milliseconds(), segSize, Collections.singletonMap(0, 0L), 
0);
+                time.milliseconds(), segSize, Collections.singletonMap(0, 0L));
             exception = assertThrows(ExecutionException.class, () -> 
remoteLogMetadataManager.addRemoteLogSegmentMetadata(followerSegmentMetadata).get());
             assertEquals("org.apache.kafka.common.KafkaException: This 
consumer is not assigned to the target partition 0. Currently assigned 
partitions: []",
                 exception.getMessage());
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java
index 4f95fac24d8..783aa022df7 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java
@@ -79,11 +79,11 @@ public class TopicBasedRemoteLogMetadataManagerRestartTest {
         RemoteLogSegmentMetadata leaderSegmentMetadata = new 
RemoteLogSegmentMetadata(
                 new RemoteLogSegmentId(leaderTopicIdPartition, 
Uuid.randomUuid()),
                 0, 100, -1L, 0,
-                time.milliseconds(), segSize, Collections.singletonMap(0, 0L), 
0);
+                time.milliseconds(), segSize, Collections.singletonMap(0, 0L));
         RemoteLogSegmentMetadata followerSegmentMetadata = new 
RemoteLogSegmentMetadata(
                 new RemoteLogSegmentId(followerTopicIdPartition, 
Uuid.randomUuid()),
                 0, 100, -1L, 0,
-                time.milliseconds(), segSize, Collections.singletonMap(0, 0L), 
0);
+                time.milliseconds(), segSize, Collections.singletonMap(0, 0L));
 
         try (TopicBasedRemoteLogMetadataManager 
topicBasedRemoteLogMetadataManager = 
createTopicBasedRemoteLogMetadataManager()) {
             // Register these partitions to RemoteLogMetadataManager.
@@ -113,7 +113,7 @@ public class TopicBasedRemoteLogMetadataManagerRestartTest {
             RemoteLogSegmentMetadata leaderSegmentMetadata2 = new 
RemoteLogSegmentMetadata(
                     new RemoteLogSegmentId(leaderTopicIdPartition, 
Uuid.randomUuid()),
                     101, 200, -1L, 0,
-                    time.milliseconds(), segSize, Collections.singletonMap(0, 
101L), 0);
+                    time.milliseconds(), segSize, Collections.singletonMap(0, 
101L));
             
topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(leaderSegmentMetadata2).get();
 
             // Check that both the stored segment and recently added segment 
are available.
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
index b0a65bd9c0a..d7842187596 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
@@ -141,12 +141,12 @@ public class TopicBasedRemoteLogMetadataManagerTest {
         // has not yet been subscribing as they are not yet registered.
         RemoteLogSegmentMetadata leaderSegmentMetadata = new 
RemoteLogSegmentMetadata(new RemoteLogSegmentId(newLeaderTopicIdPartition, 
Uuid.randomUuid()),
                                                                                
 0, 100, -1L, 0,
-                                                                               
 time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L), 0);
+                                                                               
 time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
         assertThrows(Exception.class, () -> 
topicBasedRlmm().addRemoteLogSegmentMetadata(leaderSegmentMetadata).get());
 
         RemoteLogSegmentMetadata followerSegmentMetadata = new 
RemoteLogSegmentMetadata(new RemoteLogSegmentId(newFollowerTopicIdPartition, 
Uuid.randomUuid()),
                                                                                
 0, 100, -1L, 0,
-                                                                               
 time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L), 0);
+                                                                               
 time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
         assertThrows(Exception.class, () -> 
topicBasedRlmm().addRemoteLogSegmentMetadata(followerSegmentMetadata).get());
 
         // `listRemoteLogSegments` will receive an exception as these topic 
partitions are not yet registered.
@@ -195,11 +195,11 @@ public class TopicBasedRemoteLogMetadataManagerTest {
         
}).when(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(any());
 
         RemoteLogSegmentMetadata firstSegmentMetadata = new 
RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, 
Uuid.randomUuid()),
-                0, 100, -1L, 0, time.milliseconds(), SEG_SIZE, 
Collections.singletonMap(0, 0L), 0);
+                0, 100, -1L, 0, time.milliseconds(), SEG_SIZE, 
Collections.singletonMap(0, 0L));
         RemoteLogSegmentMetadata secondSegmentMetadata = new 
RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, 
Uuid.randomUuid()),
-                100, 200, -1L, 0, time.milliseconds(), SEG_SIZE * 2, 
Collections.singletonMap(0, 0L), 0);
+                100, 200, -1L, 0, time.milliseconds(), SEG_SIZE * 2, 
Collections.singletonMap(0, 0L));
         RemoteLogSegmentMetadata thirdSegmentMetadata = new 
RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, 
Uuid.randomUuid()),
-                200, 300, -1L, 0, time.milliseconds(), SEG_SIZE * 3, 
Collections.singletonMap(0, 0L), 0);
+                200, 300, -1L, 0, time.milliseconds(), SEG_SIZE * 3, 
Collections.singletonMap(0, 0L));
 
         
topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(firstSegmentMetadata);
         
topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(secondSegmentMetadata);
@@ -240,11 +240,11 @@ public class TopicBasedRemoteLogMetadataManagerTest {
         
}).when(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(any());
 
         RemoteLogSegmentMetadata firstSegmentMetadata = new 
RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, 
Uuid.randomUuid()),
-                0, 100, -1L, 0, time.milliseconds(), SEG_SIZE, 
Collections.singletonMap(0, 0L), 0);
+                0, 100, -1L, 0, time.milliseconds(), SEG_SIZE, 
Collections.singletonMap(0, 0L));
         RemoteLogSegmentMetadata secondSegmentMetadata = new 
RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, 
Uuid.randomUuid()),
-                100, 200, -1L, 0, time.milliseconds(), SEG_SIZE * 2, 
Collections.singletonMap(1, 100L), 0);
+                100, 200, -1L, 0, time.milliseconds(), SEG_SIZE * 2, 
Collections.singletonMap(1, 100L));
         RemoteLogSegmentMetadata thirdSegmentMetadata = new 
RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, 
Uuid.randomUuid()),
-                200, 300, -1L, 0, time.milliseconds(), SEG_SIZE * 3, 
Collections.singletonMap(2, 200L), 0);
+                200, 300, -1L, 0, time.milliseconds(), SEG_SIZE * 3, 
Collections.singletonMap(2, 200L));
 
         
topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(firstSegmentMetadata);
         
topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(secondSegmentMetadata);
@@ -285,9 +285,9 @@ public class TopicBasedRemoteLogMetadataManagerTest {
         
}).when(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(any());
 
         RemoteLogSegmentMetadata firstSegmentMetadata = new 
RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, 
Uuid.randomUuid()),
-                0, 100, -1L, 0, time.milliseconds(), SEG_SIZE, 
Collections.singletonMap(0, 0L), 0);
+                0, 100, -1L, 0, time.milliseconds(), SEG_SIZE, 
Collections.singletonMap(0, 0L));
         RemoteLogSegmentMetadata secondSegmentMetadata = new 
RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, 
Uuid.randomUuid()),
-                100, 200, -1L, 0, time.milliseconds(), SEG_SIZE * 2, 
Collections.singletonMap(1, 100L), 0);
+                100, 200, -1L, 0, time.milliseconds(), SEG_SIZE * 2, 
Collections.singletonMap(1, 100L));
 
         
topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(firstSegmentMetadata);
         
topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(secondSegmentMetadata);
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransformTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransformTest.java
index bf370d55c13..0745bbaeb8f 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransformTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransformTest.java
@@ -45,7 +45,8 @@ class RemoteLogSegmentMetadataTransformTest {
                 0L, 100L, -1L, 0, 0, 1234,
                 customMetadata,
                 RemoteLogSegmentState.COPY_SEGMENT_FINISHED,
-                segmentLeaderEpochs,  0);
+                segmentLeaderEpochs
+        );
 
         RemoteLogSegmentMetadataTransform transform = new 
RemoteLogSegmentMetadataTransform();
         ApiMessageAndVersion message = 
transform.toApiMessageAndVersion(metadata);
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java
index d3391a45387..983fb0a46f6 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java
@@ -358,7 +358,7 @@ public final class LocalTieredStorageTest {
 
     private RemoteLogSegmentMetadata newRemoteLogSegmentMetadata(final 
RemoteLogSegmentId id) {
         return new RemoteLogSegmentMetadata(id, 0, 0, -1L, -1, 1000L,
-                1024, Collections.singletonMap(0, 0L), 0);
+                1024, Collections.singletonMap(0, 0L));
     }
 
     private RemoteLogSegmentId newRemoteLogSegmentId() {
@@ -539,7 +539,7 @@ public final class LocalTieredStorageTest {
 
         private RemoteLogSegmentMetadata newMetadata(final RemoteLogSegmentId 
id) {
             return new RemoteLogSegmentMetadata(id, 0, 0, -1L, -1, 1000,
-                    1024, Collections.singletonMap(0, 0L), 0);
+                    1024, Collections.singletonMap(0, 0L));
         }
 
         private String getStorageRootDirectory() {
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java
index 6c3ed561600..a425c999c03 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java
@@ -76,7 +76,7 @@ public class RemoteLogMetadataManagerTest {
             Map<Integer, Long> segmentLeaderEpochs = 
Collections.singletonMap(0, 101L);
             RemoteLogSegmentId segmentId = new RemoteLogSegmentId(TP0, 
Uuid.randomUuid());
             RemoteLogSegmentMetadata segmentMetadata = new 
RemoteLogSegmentMetadata(
-                    segmentId, 101L, 200L, -1L, BROKER_ID_0, 
time.milliseconds(), SEG_SIZE, segmentLeaderEpochs, 0);
+                    segmentId, 101L, 200L, -1L, BROKER_ID_0, 
time.milliseconds(), SEG_SIZE, segmentLeaderEpochs);
             // Wait until the segment is added successfully.
             assertDoesNotThrow(() -> 
remoteLogMetadataManager.addRemoteLogSegmentMetadata(segmentMetadata).get());
 
@@ -115,7 +115,7 @@ public class RemoteLogMetadataManagerTest {
             segmentLeaderEpochs.put(3, 80L);
             RemoteLogSegmentId segmentId = new RemoteLogSegmentId(TP0, 
Uuid.randomUuid());
             RemoteLogSegmentMetadata segmentMetadata = new 
RemoteLogSegmentMetadata(
-                    segmentId, 0L, 100L, -1L, BROKER_ID_0, 
time.milliseconds(), SEG_SIZE, segmentLeaderEpochs, 0);
+                    segmentId, 0L, 100L, -1L, BROKER_ID_0, 
time.milliseconds(), SEG_SIZE, segmentLeaderEpochs);
             // Wait until the segment is added successfully.
             assertDoesNotThrow(() -> 
remoteLogMetadataManager.addRemoteLogSegmentMetadata(segmentMetadata).get());
 


Reply via email to