This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8438c4339e0 KAFKA-17245: Revert TopicRecord changes. (#16780)
8438c4339e0 is described below
commit 8438c4339e0a103d95575336a9f3653698cd0b8e
Author: TengYao Chi <[email protected]>
AuthorDate: Sat Aug 3 20:15:51 2024 +0800
KAFKA-17245: Revert TopicRecord changes. (#16780)
Revert KAFKA-16257 changes because KIP-950 doesn't need it anymore.
Reviewers: Luke Chen <[email protected]>
---
.../java/kafka/log/remote/RemoteLogManager.java | 3 +--
.../kafka/log/remote/RemoteLogManagerTest.java | 12 ++++++------
.../kafka/admin/RemoteTopicCrudTest.scala | 2 +-
.../kafka/log/remote/RemoteIndexCacheTest.scala | 8 ++++----
.../resources/common/metadata/TopicRecord.json | 10 ++--------
.../kafka/metadata/util/RecordRedactorTest.java | 2 +-
.../remote/storage/RemoteLogSegmentMetadata.java | 22 ++++------------------
.../storage/RemoteLogSegmentMetadataTest.java | 8 +++-----
.../RemoteLogSegmentMetadataTransform.java | 2 +-
.../remote/metadata/storage/ConsumerTaskTest.java | 2 +-
.../storage/RemoteLogMetadataCacheTest.java | 4 ++--
.../storage/RemoteLogMetadataFormatterTest.java | 2 +-
.../storage/RemoteLogMetadataSerdeTest.java | 2 +-
.../storage/RemoteLogMetadataTransformTest.java | 2 +-
.../storage/RemoteLogSegmentLifecycleTest.java | 6 +++---
...ogMetadataManagerMultipleSubscriptionsTest.java | 4 ++--
...icBasedRemoteLogMetadataManagerRestartTest.java | 6 +++---
.../TopicBasedRemoteLogMetadataManagerTest.java | 20 ++++++++++----------
.../RemoteLogSegmentMetadataTransformTest.java | 3 ++-
.../log/remote/storage/LocalTieredStorageTest.java | 4 ++--
.../storage/RemoteLogMetadataManagerTest.java | 4 ++--
21 files changed, 53 insertions(+), 75 deletions(-)
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index f53f5a35623..a1714211af5 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -933,7 +933,6 @@ public class RemoteLogManager implements Closeable {
logger.info("Copying {} to remote storage.", logFileName);
long endOffset = nextSegmentBaseOffset - 1;
- int tieredEpoch = 0;
File producerStateSnapshotFile =
log.producerStateManager().fetchSnapshot(nextSegmentBaseOffset).orElse(null);
List<EpochEntry> epochEntries = getLeaderEpochEntries(log,
segment.baseOffset(), nextSegmentBaseOffset);
@@ -942,7 +941,7 @@ public class RemoteLogManager implements Closeable {
RemoteLogSegmentMetadata copySegmentStartedRlsm = new
RemoteLogSegmentMetadata(segmentId, segment.baseOffset(), endOffset,
segment.largestTimestamp(), brokerId, time.milliseconds(),
segment.log().sizeInBytes(),
- segmentLeaderEpochs, tieredEpoch);
+ segmentLeaderEpochs);
remoteLogMetadataManager.addRemoteLogSegmentMetadata(copySegmentStartedRlsm).get();
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index f27e39c1f1d..302b6adfe48 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -1566,7 +1566,7 @@ public class RemoteLogManagerTest {
100000L,
1000,
Optional.empty(),
- RemoteLogSegmentState.COPY_SEGMENT_FINISHED, segmentEpochs, 0);
+ RemoteLogSegmentState.COPY_SEGMENT_FINISHED, segmentEpochs);
}
@Test
@@ -1906,9 +1906,9 @@ public class RemoteLogManagerTest {
int segmentSize = 1024;
List<RemoteLogSegmentMetadata> segmentMetadataList = Arrays.asList(
new RemoteLogSegmentMetadata(new
RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()),
- 500, 539, timestamp, brokerId, timestamp, segmentSize,
truncateAndGetLeaderEpochs(epochEntries, 500L, 539L), 0),
+ 500, 539, timestamp, brokerId, timestamp, segmentSize,
truncateAndGetLeaderEpochs(epochEntries, 500L, 539L)),
new RemoteLogSegmentMetadata(new
RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()),
- 540, 700, timestamp, brokerId, timestamp, segmentSize,
truncateAndGetLeaderEpochs(epochEntries, 540L, 700L), 0)
+ 540, 700, timestamp, brokerId, timestamp, segmentSize,
truncateAndGetLeaderEpochs(epochEntries, 540L, 700L))
);
when(remoteLogMetadataManager.listRemoteLogSegments(eq(leaderTopicIdPartition),
anyInt()))
.thenAnswer(invocation -> {
@@ -2231,7 +2231,7 @@ public class RemoteLogManagerTest {
RemoteLogSegmentMetadata metadata2 = new RemoteLogSegmentMetadata(new
RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()),
metadata1.startOffset(), metadata1.endOffset() + 5,
metadata1.maxTimestampMs(),
metadata1.brokerId() + 1, metadata1.eventTimestampMs(),
metadata1.segmentSizeInBytes() + 128,
- metadata1.customMetadata(), metadata1.state(),
metadata1.segmentLeaderEpochs(), 0);
+ metadata1.customMetadata(), metadata1.state(),
metadata1.segmentLeaderEpochs());
// When there are overlapping/duplicate segments, the
RemoteLogMetadataManager#listRemoteLogSegments
// returns the segments in order of (valid ++ unreferenced) segments:
@@ -2657,8 +2657,8 @@ public class RemoteLogManagerTest {
segmentSize,
Optional.empty(),
state,
- segmentLeaderEpochs,
- 0);
+ segmentLeaderEpochs
+ );
segmentMetadataList.add(metadata);
}
return segmentMetadataList;
diff --git
a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
index 25f366bfc94..c88962cb845 100644
--- a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
+++ b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
@@ -491,7 +491,7 @@ class MyRemoteLogMetadataManager extends
NoOpRemoteLogMetadataManager {
val startOffset = idx * recordsPerSegment
val endOffset = startOffset + recordsPerSegment - 1
val segmentLeaderEpochs: util.Map[Integer, java.lang.Long] =
Collections.singletonMap(0, 0L)
- segmentMetadataList.add(new RemoteLogSegmentMetadata(new
RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()), startOffset,
endOffset, timestamp, 0, timestamp, segmentSize, Optional.empty(),
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, segmentLeaderEpochs, 0))
+ segmentMetadataList.add(new RemoteLogSegmentMetadata(new
RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()), startOffset,
endOffset, timestamp, 0, timestamp, segmentSize, Optional.empty(),
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, segmentLeaderEpochs))
}
segmentMetadataList.iterator()
}
diff --git
a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
index 95e28282f97..6ebcbeb4fcd 100644
--- a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
@@ -67,7 +67,7 @@ class RemoteIndexCacheTest {
Files.createDirectory(tpDir.toPath)
val remoteLogSegmentId = RemoteLogSegmentId.generateNew(idPartition)
- rlsMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, baseOffset,
lastOffset, time.milliseconds(), brokerId, time.milliseconds(), segmentSize,
Collections.singletonMap(0, 0L), 0)
+ rlsMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, baseOffset,
lastOffset, time.milliseconds(), brokerId, time.milliseconds(), segmentSize,
Collections.singletonMap(0, 0L))
cache = new RemoteIndexCache(defaultRemoteIndexCacheSizeBytes, rsm,
tpDir.toString)
@@ -678,7 +678,7 @@ class RemoteIndexCacheTest {
@Test
def testConcurrentRemoveReadForCache(): Unit = {
// Create a spy Cache Entry
- val rlsMetadata = new
RemoteLogSegmentMetadata(RemoteLogSegmentId.generateNew(idPartition),
baseOffset, lastOffset, time.milliseconds(), brokerId, time.milliseconds(),
segmentSize, Collections.singletonMap(0, 0L), 0)
+ val rlsMetadata = new
RemoteLogSegmentMetadata(RemoteLogSegmentId.generateNew(idPartition),
baseOffset, lastOffset, time.milliseconds(), brokerId, time.milliseconds(),
segmentSize, Collections.singletonMap(0, 0L))
val timeIndex = spy(createTimeIndexForSegmentMetadata(rlsMetadata, new
File(tpDir, DIR_NAME)))
val txIndex = spy(createTxIndexForSegmentMetadata(rlsMetadata, new
File(tpDir, DIR_NAME)))
@@ -926,7 +926,7 @@ class RemoteIndexCacheTest {
private def generateSpyCacheEntry(remoteLogSegmentId: RemoteLogSegmentId
=
RemoteLogSegmentId.generateNew(idPartition)): RemoteIndexCache.Entry = {
- val rlsMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId,
baseOffset, lastOffset, time.milliseconds(), brokerId, time.milliseconds(),
segmentSize, Collections.singletonMap(0, 0L), 0)
+ val rlsMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId,
baseOffset, lastOffset, time.milliseconds(), brokerId, time.milliseconds(),
segmentSize, Collections.singletonMap(0, 0L))
val timeIndex = spy(createTimeIndexForSegmentMetadata(rlsMetadata, tpDir))
val txIndex = spy(createTxIndexForSegmentMetadata(rlsMetadata, tpDir))
val offsetIndex = spy(createOffsetIndexForSegmentMetadata(rlsMetadata,
tpDir))
@@ -993,7 +993,7 @@ class RemoteIndexCacheTest {
tpId: TopicIdPartition):
List[RemoteLogSegmentMetadata] = {
val metadataList = mutable.Buffer.empty[RemoteLogSegmentMetadata]
for (i <- 0 until size) {
- metadataList.append(new RemoteLogSegmentMetadata(new
RemoteLogSegmentId(tpId, Uuid.randomUuid()), baseOffset * i, baseOffset * i +
10, time.milliseconds(), brokerId, time.milliseconds(), segmentSize,
Collections.singletonMap(0, 0L), 0))
+ metadataList.append(new RemoteLogSegmentMetadata(new
RemoteLogSegmentId(tpId, Uuid.randomUuid()), baseOffset * i, baseOffset * i +
10, time.milliseconds(), brokerId, time.milliseconds(), segmentSize,
Collections.singletonMap(0, 0L)))
}
metadataList.toList
}
diff --git a/metadata/src/main/resources/common/metadata/TopicRecord.json
b/metadata/src/main/resources/common/metadata/TopicRecord.json
index 5a316e6dbba..6fa5a05096d 100644
--- a/metadata/src/main/resources/common/metadata/TopicRecord.json
+++ b/metadata/src/main/resources/common/metadata/TopicRecord.json
@@ -17,18 +17,12 @@
"apiKey": 2,
"type": "metadata",
"name": "TopicRecord",
- // Version 0 first version of TopicRecord with Name and TopicId
- // Version 1 adds TieredEpoch and TieredState for KIP-950
- "validVersions": "0-1",
+ "validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "Name", "type": "string", "versions": "0+", "entityType":
"topicName",
"about": "The topic name." },
{ "name": "TopicId", "type": "uuid", "versions": "0+",
- "about": "The unique ID of this topic." },
- { "name": "TieredEpoch", "type": "int32", "versions": "1+",
- "about": "The epoch denoting how many times the tiered state has
changed" },
- { "name": "TieredState", "type": "bool", "versions": "1+",
- "about": "Denotes whether the topic is currently tiered or not" }
+ "about": "The unique ID of this topic." }
]
}
diff --git
a/metadata/src/test/java/org/apache/kafka/metadata/util/RecordRedactorTest.java
b/metadata/src/test/java/org/apache/kafka/metadata/util/RecordRedactorTest.java
index a5a18e582a0..0fbd4ef00ae 100644
---
a/metadata/src/test/java/org/apache/kafka/metadata/util/RecordRedactorTest.java
+++
b/metadata/src/test/java/org/apache/kafka/metadata/util/RecordRedactorTest.java
@@ -50,7 +50,7 @@ public final class RecordRedactorTest {
@Test
public void testTopicRecordToString() {
- assertEquals("TopicRecord(name='foo', topicId=UOovKkohSU6AGdYW33ZUNg,
tieredEpoch=0, tieredState=false)",
+ assertEquals("TopicRecord(name='foo', topicId=UOovKkohSU6AGdYW33ZUNg)",
REDACTOR.toLoggableString(new TopicRecord().
setTopicId(Uuid.fromString("UOovKkohSU6AGdYW33ZUNg")).
setName("foo")));
diff --git
a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java
index 4389a90ab8a..9b589322bbf 100644
---
a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java
+++
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java
@@ -78,11 +78,6 @@ public class RemoteLogSegmentMetadata extends
RemoteLogMetadata {
*/
private final RemoteLogSegmentState state;
- /**
- * The epoch denoting how many times the tiered state has changed
- */
- private final int tieredEpoch;
-
/**
* Creates an instance with the given metadata of remote log segment.
* <p>
@@ -99,7 +94,6 @@ public class RemoteLogSegmentMetadata extends
RemoteLogMetadata {
* @param customMetadata Custom metadata.
* @param state State of the respective segment of
remoteLogSegmentId.
* @param segmentLeaderEpochs leader epochs occurred within this segment.
- * @param tieredEpoch The epoch denoting how many times the tiered
state has changed
*/
public RemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId,
long startOffset,
@@ -110,11 +104,10 @@ public class RemoteLogSegmentMetadata extends
RemoteLogMetadata {
int segmentSizeInBytes,
Optional<CustomMetadata> customMetadata,
RemoteLogSegmentState state,
- Map<Integer, Long> segmentLeaderEpochs,
int tieredEpoch) {
+ Map<Integer, Long> segmentLeaderEpochs) {
super(brokerId, eventTimestampMs);
this.remoteLogSegmentId = Objects.requireNonNull(remoteLogSegmentId,
"remoteLogSegmentId can not be null");
this.state = Objects.requireNonNull(state, "state can not be null");
- this.tieredEpoch = tieredEpoch;
if (startOffset < 0) {
throw new IllegalArgumentException("Unexpected start offset = " +
startOffset + ". StartOffset for a remote segment cannot be negative");
@@ -151,7 +144,6 @@ public class RemoteLogSegmentMetadata extends
RemoteLogMetadata {
* @param eventTimestampMs Epoch time in milli seconds at which the
remote log segment is copied to the remote tier storage.
* @param segmentSizeInBytes Size of this segment in bytes.
* @param segmentLeaderEpochs leader epochs occurred within this segment
- * @param tieredEpoch
*/
public RemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId,
long startOffset,
@@ -160,8 +152,7 @@ public class RemoteLogSegmentMetadata extends
RemoteLogMetadata {
int brokerId,
long eventTimestampMs,
int segmentSizeInBytes,
- Map<Integer, Long> segmentLeaderEpochs,
- int tieredEpoch) {
+ Map<Integer, Long> segmentLeaderEpochs) {
this(remoteLogSegmentId,
startOffset,
endOffset,
@@ -170,8 +161,7 @@ public class RemoteLogSegmentMetadata extends
RemoteLogMetadata {
eventTimestampMs, segmentSizeInBytes,
Optional.empty(),
RemoteLogSegmentState.COPY_SEGMENT_STARTED,
- segmentLeaderEpochs,
- tieredEpoch);
+ segmentLeaderEpochs);
}
@@ -237,10 +227,6 @@ public class RemoteLogSegmentMetadata extends
RemoteLogMetadata {
return state;
}
- public int tieredEpoch() {
- return tieredEpoch;
- }
-
/**
* Creates a new RemoteLogSegmentMetadata applying the given {@code
rlsmUpdate} on this instance. This method will
* not update this instance.
@@ -255,7 +241,7 @@ public class RemoteLogSegmentMetadata extends
RemoteLogMetadata {
return new RemoteLogSegmentMetadata(remoteLogSegmentId, startOffset,
endOffset, maxTimestampMs, rlsmUpdate.brokerId(),
rlsmUpdate.eventTimestampMs(),
- segmentSizeInBytes, rlsmUpdate.customMetadata(),
rlsmUpdate.state(), segmentLeaderEpochs, tieredEpoch);
+ segmentSizeInBytes, rlsmUpdate.customMetadata(),
rlsmUpdate.state(), segmentLeaderEpochs);
}
@Override
diff --git
a/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadataTest.java
b/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadataTest.java
index 4e11f57e39a..0aaaa4d99e4 100644
---
a/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadataTest.java
+++
b/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadataTest.java
@@ -43,14 +43,13 @@ class RemoteLogSegmentMetadataTest {
long endOffset = 100L;
int segmentSize = 123;
long maxTimestamp = -1L;
- int tieredEpoch = 0;
Map<Integer, Long> segmentLeaderEpochs = new HashMap<>();
segmentLeaderEpochs.put(0, 0L);
RemoteLogSegmentId segmentId = new RemoteLogSegmentId(TP0,
Uuid.randomUuid());
RemoteLogSegmentMetadata segmentMetadata = new
RemoteLogSegmentMetadata(segmentId, startOffset, endOffset,
maxTimestamp, brokerId, eventTimestamp, segmentSize,
- segmentLeaderEpochs, tieredEpoch);
+ segmentLeaderEpochs);
CustomMetadata customMetadata = new CustomMetadata(new byte[]{0, 1, 2,
3});
RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new
RemoteLogSegmentMetadataUpdate(
@@ -62,8 +61,8 @@ class RemoteLogSegmentMetadataTest {
segmentId, startOffset, endOffset,
maxTimestamp, brokerIdFinished, timestampFinished,
segmentSize, Optional.of(customMetadata),
RemoteLogSegmentState.COPY_SEGMENT_FINISHED,
- segmentLeaderEpochs,
- tieredEpoch);
+ segmentLeaderEpochs
+ );
assertEquals(expectedUpdatedMetadata, updatedMetadata);
// Check that the original metadata have not changed.
@@ -75,6 +74,5 @@ class RemoteLogSegmentMetadataTest {
assertEquals(eventTimestamp, segmentMetadata.eventTimestampMs());
assertEquals(segmentSize, segmentMetadata.segmentSizeInBytes());
assertEquals(segmentLeaderEpochs,
segmentMetadata.segmentLeaderEpochs());
- assertEquals(tieredEpoch, segmentMetadata.tieredEpoch());
}
}
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java
index f97ca86b9ff..9e893d2cbc3 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java
@@ -83,7 +83,7 @@ public class RemoteLogSegmentMetadataTransform implements
RemoteLogMetadataTrans
new RemoteLogSegmentMetadata(remoteLogSegmentId,
record.startOffset(), record.endOffset(),
record.maxTimestampMs(),
record.brokerId(),
record.eventTimestampMs(),
record.segmentSizeInBytes(),
- segmentLeaderEpochs, 0);
+ segmentLeaderEpochs);
RemoteLogSegmentMetadataUpdate rlsmUpdate
= new RemoteLogSegmentMetadataUpdate(remoteLogSegmentId,
record.eventTimestampMs(),
customMetadata,
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java
index c92efce68af..66176c68477 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java
@@ -425,7 +425,7 @@ public class ConsumerTaskTest {
final TopicIdPartition idPartition,
final long recordOffset) {
final RemoteLogSegmentId segmentId = new
RemoteLogSegmentId(idPartition, Uuid.randomUuid());
- final RemoteLogMetadata metadata = new
RemoteLogSegmentMetadata(segmentId, 0L, 1L, 0L, 0, 0L, 1,
Collections.singletonMap(0, 0L), 0);
+ final RemoteLogMetadata metadata = new
RemoteLogSegmentMetadata(segmentId, 0L, 1L, 0L, 0, 0L, 1,
Collections.singletonMap(0, 0L));
final ConsumerRecord<byte[], byte[]> record = new
ConsumerRecord<>(TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME,
metadataPartition, recordOffset, null, serde.serialize(metadata));
consumer.addRecord(record);
}
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCacheTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCacheTest.java
index 20918e0ca06..6b93f61dc7c 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCacheTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCacheTest.java
@@ -60,7 +60,7 @@ public class RemoteLogMetadataCacheTest {
if (state != RemoteLogSegmentState.COPY_SEGMENT_STARTED) {
RemoteLogSegmentId segmentId = new RemoteLogSegmentId(tpId0,
Uuid.randomUuid());
RemoteLogSegmentMetadata segmentMetadata = new
RemoteLogSegmentMetadata(segmentId, 0, 100L,
- -1L, brokerId0, time.milliseconds(), segmentSize,
Collections.singletonMap(0, 0L), 0);
+ -1L, brokerId0, time.milliseconds(), segmentSize,
Collections.singletonMap(0, 0L));
RemoteLogSegmentMetadata updatedMetadata =
segmentMetadata.createWithUpdates(
new RemoteLogSegmentMetadataUpdate(segmentId,
time.milliseconds(), Optional.empty(),
state, brokerId1));
@@ -102,7 +102,7 @@ public class RemoteLogMetadataCacheTest {
long offset = 10L;
RemoteLogSegmentId segmentId = new RemoteLogSegmentId(tpId0,
Uuid.randomUuid());
RemoteLogSegmentMetadata segmentMetadata = new
RemoteLogSegmentMetadata(segmentId, offset, 100L,
- -1L, brokerId0, time.milliseconds(), segmentSize,
Collections.singletonMap(leaderEpoch, offset), 0);
+ -1L, brokerId0, time.milliseconds(), segmentSize,
Collections.singletonMap(leaderEpoch, offset));
cache.addCopyInProgressSegment(segmentMetadata);
// invalid-transition-1. COPY_SEGMENT_STARTED ->
DELETE_SEGMENT_FINISHED
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java
index a4ac17cef23..d6a03441e8b 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java
@@ -53,7 +53,7 @@ public class RemoteLogMetadataFormatterTest {
Optional<CustomMetadata> customMetadata = Optional.of(new
CustomMetadata(new byte[10]));
RemoteLogSegmentMetadata remoteLogMetadata = new
RemoteLogSegmentMetadata(
remoteLogSegmentId, 0L, 100L, -1L, 1, 123L, 1024,
customMetadata, COPY_SEGMENT_STARTED,
- segLeaderEpochs, 0);
+ segLeaderEpochs);
byte[] metadataBytes = new
RemoteLogMetadataSerde().serialize(remoteLogMetadata);
ConsumerRecord<byte[], byte[]> metadataRecord = new ConsumerRecord<>(
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java
index 6e2b8960da1..ba2c3d1f26e 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java
@@ -72,7 +72,7 @@ public class RemoteLogMetadataSerdeTest {
RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0,
Uuid.randomUuid());
return new RemoteLogSegmentMetadata(remoteLogSegmentId, 0L, 100L, -1L,
1,
time.milliseconds(), 1024, Optional.of(new CustomMetadata(new
byte[] {0, 1, 2, 3})),
- COPY_SEGMENT_STARTED, segLeaderEpochs, 0);
+ COPY_SEGMENT_STARTED, segLeaderEpochs);
}
private RemoteLogSegmentMetadataUpdate
createRemoteLogSegmentMetadataUpdate() {
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java
index 41d82897033..e7131a81303 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java
@@ -68,7 +68,7 @@ public class RemoteLogMetadataTransformTest {
private RemoteLogSegmentMetadata createRemoteLogSegmentMetadata() {
RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0,
Uuid.randomUuid());
return new RemoteLogSegmentMetadata(remoteLogSegmentId, 0L, 100L, -1L,
1,
- time.milliseconds(), 1024, Collections.singletonMap(0, 0L), 0);
+ time.milliseconds(), 1024, Collections.singletonMap(0, 0L));
}
@Test
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java
index 0a36ef1b559..0d91f69d4bb 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java
@@ -94,7 +94,7 @@ public class RemoteLogSegmentLifecycleTest {
leaderEpochSegment0.put(2, 80L);
RemoteLogSegmentId segmentId0 = new
RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid());
RemoteLogSegmentMetadata metadataSegment0 = new
RemoteLogSegmentMetadata(segmentId0, 0L,
- 100L, -1L, brokerId0, time.milliseconds(), segSize,
leaderEpochSegment0, 0);
+ 100L, -1L, brokerId0, time.milliseconds(), segSize,
leaderEpochSegment0);
metadataManager.addRemoteLogSegmentMetadata(metadataSegment0).get();
verify(spyRemotePartitionMetadataStore).handleRemoteLogSegmentMetadata(metadataSegment0);
@@ -223,7 +223,7 @@ public class RemoteLogSegmentLifecycleTest {
throws RemoteStorageException, ExecutionException,
InterruptedException {
RemoteLogSegmentId segmentId = new
RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid());
RemoteLogSegmentMetadata segmentMetadata = new
RemoteLogSegmentMetadata(segmentId, startOffset, endOffset,
- -1L, brokerId0, time.milliseconds(), segSize,
segmentLeaderEpochs, 0);
+ -1L, brokerId0, time.milliseconds(), segSize,
segmentLeaderEpochs);
metadataManager.addRemoteLogSegmentMetadata(segmentMetadata).get();
verify(spyRemotePartitionMetadataStore).handleRemoteLogSegmentMetadata(segmentMetadata);
@@ -258,7 +258,7 @@ public class RemoteLogSegmentLifecycleTest {
// segments.
RemoteLogSegmentId segmentId = new
RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid());
RemoteLogSegmentMetadata segmentMetadata = new
RemoteLogSegmentMetadata(segmentId, 0L, 50L,
- -1L, brokerId0, time.milliseconds(), segSize,
Collections.singletonMap(0, 0L), 0);
+ -1L, brokerId0, time.milliseconds(), segSize,
Collections.singletonMap(0, 0L));
metadataManager.addRemoteLogSegmentMetadata(segmentMetadata).get();
verify(spyRemotePartitionMetadataStore).handleRemoteLogSegmentMetadata(segmentMetadata);
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java
index 55541d18a42..f64996ae46d 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java
@@ -127,7 +127,7 @@ public class
TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest {
int segSize = 1048576;
RemoteLogSegmentMetadata leaderSegmentMetadata = new
RemoteLogSegmentMetadata(new RemoteLogSegmentId(leaderTopicIdPartition,
Uuid.randomUuid()),
0, 100, -1L, 0,
- time.milliseconds(), segSize, Collections.singletonMap(0, 0L),
0);
+ time.milliseconds(), segSize, Collections.singletonMap(0, 0L));
ExecutionException exception =
assertThrows(ExecutionException.class,
() ->
remoteLogMetadataManager.addRemoteLogSegmentMetadata(leaderSegmentMetadata).get());
assertEquals("org.apache.kafka.common.KafkaException: This
consumer is not assigned to the target partition 0. Currently assigned
partitions: []",
@@ -135,7 +135,7 @@ public class
TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest {
RemoteLogSegmentMetadata followerSegmentMetadata = new
RemoteLogSegmentMetadata(new RemoteLogSegmentId(followerTopicIdPartition,
Uuid.randomUuid()),
0, 100, -1L, 0,
- time.milliseconds(), segSize, Collections.singletonMap(0, 0L),
0);
+ time.milliseconds(), segSize, Collections.singletonMap(0, 0L));
exception = assertThrows(ExecutionException.class, () ->
remoteLogMetadataManager.addRemoteLogSegmentMetadata(followerSegmentMetadata).get());
assertEquals("org.apache.kafka.common.KafkaException: This
consumer is not assigned to the target partition 0. Currently assigned
partitions: []",
exception.getMessage());
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java
index 4f95fac24d8..783aa022df7 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java
@@ -79,11 +79,11 @@ public class TopicBasedRemoteLogMetadataManagerRestartTest {
RemoteLogSegmentMetadata leaderSegmentMetadata = new
RemoteLogSegmentMetadata(
new RemoteLogSegmentId(leaderTopicIdPartition,
Uuid.randomUuid()),
0, 100, -1L, 0,
- time.milliseconds(), segSize, Collections.singletonMap(0, 0L),
0);
+ time.milliseconds(), segSize, Collections.singletonMap(0, 0L));
RemoteLogSegmentMetadata followerSegmentMetadata = new
RemoteLogSegmentMetadata(
new RemoteLogSegmentId(followerTopicIdPartition,
Uuid.randomUuid()),
0, 100, -1L, 0,
- time.milliseconds(), segSize, Collections.singletonMap(0, 0L),
0);
+ time.milliseconds(), segSize, Collections.singletonMap(0, 0L));
try (TopicBasedRemoteLogMetadataManager
topicBasedRemoteLogMetadataManager =
createTopicBasedRemoteLogMetadataManager()) {
// Register these partitions to RemoteLogMetadataManager.
@@ -113,7 +113,7 @@ public class TopicBasedRemoteLogMetadataManagerRestartTest {
RemoteLogSegmentMetadata leaderSegmentMetadata2 = new
RemoteLogSegmentMetadata(
new RemoteLogSegmentId(leaderTopicIdPartition,
Uuid.randomUuid()),
101, 200, -1L, 0,
- time.milliseconds(), segSize, Collections.singletonMap(0,
101L), 0);
+ time.milliseconds(), segSize, Collections.singletonMap(0,
101L));
topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(leaderSegmentMetadata2).get();
// Check that both the stored segment and recently added segment
are available.
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
index b0a65bd9c0a..d7842187596 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
@@ -141,12 +141,12 @@ public class TopicBasedRemoteLogMetadataManagerTest {
// has not yet been subscribing as they are not yet registered.
RemoteLogSegmentMetadata leaderSegmentMetadata = new
RemoteLogSegmentMetadata(new RemoteLogSegmentId(newLeaderTopicIdPartition,
Uuid.randomUuid()),
0, 100, -1L, 0,
-
time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L), 0);
+
time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
assertThrows(Exception.class, () ->
topicBasedRlmm().addRemoteLogSegmentMetadata(leaderSegmentMetadata).get());
RemoteLogSegmentMetadata followerSegmentMetadata = new
RemoteLogSegmentMetadata(new RemoteLogSegmentId(newFollowerTopicIdPartition,
Uuid.randomUuid()),
0, 100, -1L, 0,
-
time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L), 0);
+
time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
assertThrows(Exception.class, () ->
topicBasedRlmm().addRemoteLogSegmentMetadata(followerSegmentMetadata).get());
// `listRemoteLogSegments` will receive an exception as these topic
partitions are not yet registered.
@@ -195,11 +195,11 @@ public class TopicBasedRemoteLogMetadataManagerTest {
}).when(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(any());
RemoteLogSegmentMetadata firstSegmentMetadata = new
RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition,
Uuid.randomUuid()),
- 0, 100, -1L, 0, time.milliseconds(), SEG_SIZE,
Collections.singletonMap(0, 0L), 0);
+ 0, 100, -1L, 0, time.milliseconds(), SEG_SIZE,
Collections.singletonMap(0, 0L));
RemoteLogSegmentMetadata secondSegmentMetadata = new
RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition,
Uuid.randomUuid()),
- 100, 200, -1L, 0, time.milliseconds(), SEG_SIZE * 2,
Collections.singletonMap(0, 0L), 0);
+ 100, 200, -1L, 0, time.milliseconds(), SEG_SIZE * 2,
Collections.singletonMap(0, 0L));
RemoteLogSegmentMetadata thirdSegmentMetadata = new
RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition,
Uuid.randomUuid()),
- 200, 300, -1L, 0, time.milliseconds(), SEG_SIZE * 3,
Collections.singletonMap(0, 0L), 0);
+ 200, 300, -1L, 0, time.milliseconds(), SEG_SIZE * 3,
Collections.singletonMap(0, 0L));
topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(firstSegmentMetadata);
topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(secondSegmentMetadata);
@@ -240,11 +240,11 @@ public class TopicBasedRemoteLogMetadataManagerTest {
}).when(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(any());
RemoteLogSegmentMetadata firstSegmentMetadata = new
RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition,
Uuid.randomUuid()),
- 0, 100, -1L, 0, time.milliseconds(), SEG_SIZE,
Collections.singletonMap(0, 0L), 0);
+ 0, 100, -1L, 0, time.milliseconds(), SEG_SIZE,
Collections.singletonMap(0, 0L));
RemoteLogSegmentMetadata secondSegmentMetadata = new
RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition,
Uuid.randomUuid()),
- 100, 200, -1L, 0, time.milliseconds(), SEG_SIZE * 2,
Collections.singletonMap(1, 100L), 0);
+ 100, 200, -1L, 0, time.milliseconds(), SEG_SIZE * 2,
Collections.singletonMap(1, 100L));
RemoteLogSegmentMetadata thirdSegmentMetadata = new
RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition,
Uuid.randomUuid()),
- 200, 300, -1L, 0, time.milliseconds(), SEG_SIZE * 3,
Collections.singletonMap(2, 200L), 0);
+ 200, 300, -1L, 0, time.milliseconds(), SEG_SIZE * 3,
Collections.singletonMap(2, 200L));
topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(firstSegmentMetadata);
topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(secondSegmentMetadata);
@@ -285,9 +285,9 @@ public class TopicBasedRemoteLogMetadataManagerTest {
}).when(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(any());
RemoteLogSegmentMetadata firstSegmentMetadata = new
RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition,
Uuid.randomUuid()),
- 0, 100, -1L, 0, time.milliseconds(), SEG_SIZE,
Collections.singletonMap(0, 0L), 0);
+ 0, 100, -1L, 0, time.milliseconds(), SEG_SIZE,
Collections.singletonMap(0, 0L));
RemoteLogSegmentMetadata secondSegmentMetadata = new
RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition,
Uuid.randomUuid()),
- 100, 200, -1L, 0, time.milliseconds(), SEG_SIZE * 2,
Collections.singletonMap(1, 100L), 0);
+ 100, 200, -1L, 0, time.milliseconds(), SEG_SIZE * 2,
Collections.singletonMap(1, 100L));
topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(firstSegmentMetadata);
topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(secondSegmentMetadata);
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransformTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransformTest.java
index bf370d55c13..0745bbaeb8f 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransformTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransformTest.java
@@ -45,7 +45,8 @@ class RemoteLogSegmentMetadataTransformTest {
0L, 100L, -1L, 0, 0, 1234,
customMetadata,
RemoteLogSegmentState.COPY_SEGMENT_FINISHED,
- segmentLeaderEpochs, 0);
+ segmentLeaderEpochs
+ );
RemoteLogSegmentMetadataTransform transform = new
RemoteLogSegmentMetadataTransform();
ApiMessageAndVersion message =
transform.toApiMessageAndVersion(metadata);
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java
index b967e1b739c..f1358b838cb 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java
@@ -358,7 +358,7 @@ public final class LocalTieredStorageTest {
private RemoteLogSegmentMetadata newRemoteLogSegmentMetadata(final
RemoteLogSegmentId id) {
return new RemoteLogSegmentMetadata(id, 0, 0, -1L, -1, 1000L,
- 1024, Collections.singletonMap(0, 0L), 0);
+ 1024, Collections.singletonMap(0, 0L));
}
private RemoteLogSegmentId newRemoteLogSegmentId() {
@@ -539,7 +539,7 @@ public final class LocalTieredStorageTest {
private RemoteLogSegmentMetadata newMetadata(final RemoteLogSegmentId
id) {
return new RemoteLogSegmentMetadata(id, 0, 0, -1L, -1, 1000,
- 1024, Collections.singletonMap(0, 0L), 0);
+ 1024, Collections.singletonMap(0, 0L));
}
private String getStorageRootDirectory() {
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java
index 6c3ed561600..a425c999c03 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java
@@ -76,7 +76,7 @@ public class RemoteLogMetadataManagerTest {
Map<Integer, Long> segmentLeaderEpochs =
Collections.singletonMap(0, 101L);
RemoteLogSegmentId segmentId = new RemoteLogSegmentId(TP0,
Uuid.randomUuid());
RemoteLogSegmentMetadata segmentMetadata = new
RemoteLogSegmentMetadata(
- segmentId, 101L, 200L, -1L, BROKER_ID_0,
time.milliseconds(), SEG_SIZE, segmentLeaderEpochs, 0);
+ segmentId, 101L, 200L, -1L, BROKER_ID_0,
time.milliseconds(), SEG_SIZE, segmentLeaderEpochs);
// Wait until the segment is added successfully.
assertDoesNotThrow(() ->
remoteLogMetadataManager.addRemoteLogSegmentMetadata(segmentMetadata).get());
@@ -115,7 +115,7 @@ public class RemoteLogMetadataManagerTest {
segmentLeaderEpochs.put(3, 80L);
RemoteLogSegmentId segmentId = new RemoteLogSegmentId(TP0,
Uuid.randomUuid());
RemoteLogSegmentMetadata segmentMetadata = new
RemoteLogSegmentMetadata(
- segmentId, 0L, 100L, -1L, BROKER_ID_0,
time.milliseconds(), SEG_SIZE, segmentLeaderEpochs, 0);
+ segmentId, 0L, 100L, -1L, BROKER_ID_0,
time.milliseconds(), SEG_SIZE, segmentLeaderEpochs);
// Wait until the segment is added successfully.
assertDoesNotThrow(() ->
remoteLogMetadataManager.addRemoteLogSegmentMetadata(segmentMetadata).get());