This is an automated email from the ASF dual-hosted git repository. showuon pushed a commit to branch 3.9 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 4e75c57bbb79014a0efd3d396630c810506c5921 Author: TengYao Chi <[email protected]> AuthorDate: Sat Aug 3 20:15:51 2024 +0800 KAFKA-17245: Revert TopicRecord changes. (#16780) Revert KAFKA-16257 changes because KIP-950 doesn't need it anymore. Reviewers: Luke Chen <[email protected]> --- .../java/kafka/log/remote/RemoteLogManager.java | 3 +-- .../kafka/log/remote/RemoteLogManagerTest.java | 12 ++++++------ .../kafka/admin/RemoteTopicCrudTest.scala | 2 +- .../kafka/log/remote/RemoteIndexCacheTest.scala | 8 ++++---- .../resources/common/metadata/TopicRecord.json | 10 ++-------- .../kafka/metadata/util/RecordRedactorTest.java | 2 +- .../remote/storage/RemoteLogSegmentMetadata.java | 22 ++++------------------ .../storage/RemoteLogSegmentMetadataTest.java | 8 +++----- .../RemoteLogSegmentMetadataTransform.java | 2 +- .../remote/metadata/storage/ConsumerTaskTest.java | 2 +- .../storage/RemoteLogMetadataCacheTest.java | 4 ++-- .../storage/RemoteLogMetadataFormatterTest.java | 2 +- .../storage/RemoteLogMetadataSerdeTest.java | 2 +- .../storage/RemoteLogMetadataTransformTest.java | 2 +- .../storage/RemoteLogSegmentLifecycleTest.java | 6 +++--- ...ogMetadataManagerMultipleSubscriptionsTest.java | 4 ++-- ...icBasedRemoteLogMetadataManagerRestartTest.java | 6 +++--- .../TopicBasedRemoteLogMetadataManagerTest.java | 20 ++++++++++---------- .../RemoteLogSegmentMetadataTransformTest.java | 3 ++- .../log/remote/storage/LocalTieredStorageTest.java | 4 ++-- .../storage/RemoteLogMetadataManagerTest.java | 4 ++-- 21 files changed, 53 insertions(+), 75 deletions(-) diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java index f3f0ccc11ad..7a087497bb6 100644 --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java @@ -913,7 +913,6 @@ public class RemoteLogManager implements Closeable { logger.info("Copying {} to remote storage.", logFileName); long endOffset = nextSegmentBaseOffset - 1; - int tieredEpoch = 0; File producerStateSnapshotFile = log.producerStateManager().fetchSnapshot(nextSegmentBaseOffset).orElse(null); List<EpochEntry> epochEntries = getLeaderEpochEntries(log, segment.baseOffset(), nextSegmentBaseOffset); @@ -922,7 +921,7 @@ public class RemoteLogManager implements Closeable { RemoteLogSegmentMetadata copySegmentStartedRlsm = new RemoteLogSegmentMetadata(segmentId, segment.baseOffset(), endOffset, segment.largestTimestamp(), brokerId, time.milliseconds(), segment.log().sizeInBytes(), - segmentLeaderEpochs, tieredEpoch); + segmentLeaderEpochs); remoteLogMetadataManager.addRemoteLogSegmentMetadata(copySegmentStartedRlsm).get(); diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java index 4e5f70f9337..ce0939737b1 100644 --- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java +++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java @@ -1566,7 +1566,7 @@ public class RemoteLogManagerTest { 100000L, 1000, Optional.empty(), - RemoteLogSegmentState.COPY_SEGMENT_FINISHED, segmentEpochs, 0); + RemoteLogSegmentState.COPY_SEGMENT_FINISHED, segmentEpochs); } @Test @@ -1906,9 +1906,9 @@ public class RemoteLogManagerTest { int segmentSize = 1024; List<RemoteLogSegmentMetadata> segmentMetadataList = Arrays.asList( new RemoteLogSegmentMetadata(new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()), - 500, 539, timestamp, brokerId, timestamp, segmentSize, truncateAndGetLeaderEpochs(epochEntries, 500L, 539L), 0), + 500, 539, timestamp, brokerId, timestamp, segmentSize, truncateAndGetLeaderEpochs(epochEntries, 500L, 539L)), new RemoteLogSegmentMetadata(new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()), - 540, 700, timestamp, brokerId, timestamp, segmentSize, truncateAndGetLeaderEpochs(epochEntries, 540L, 700L), 0) + 540, 700, timestamp, brokerId, timestamp, segmentSize, truncateAndGetLeaderEpochs(epochEntries, 540L, 700L)) ); when(remoteLogMetadataManager.listRemoteLogSegments(eq(leaderTopicIdPartition), anyInt())) .thenAnswer(invocation -> { @@ -2231,7 +2231,7 @@ public class RemoteLogManagerTest { RemoteLogSegmentMetadata metadata2 = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()), metadata1.startOffset(), metadata1.endOffset() + 5, metadata1.maxTimestampMs(), metadata1.brokerId() + 1, metadata1.eventTimestampMs(), metadata1.segmentSizeInBytes() + 128, - metadata1.customMetadata(), metadata1.state(), metadata1.segmentLeaderEpochs(), 0); + metadata1.customMetadata(), metadata1.state(), metadata1.segmentLeaderEpochs()); // When there are overlapping/duplicate segments, the RemoteLogMetadataManager#listRemoteLogSegments // returns the segments in order of (valid ++ unreferenced) segments: @@ -2657,8 +2657,8 @@ public class RemoteLogManagerTest { segmentSize, Optional.empty(), state, - segmentLeaderEpochs, - 0); + segmentLeaderEpochs + ); segmentMetadataList.add(metadata); } return segmentMetadataList; diff --git a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala index e362832d638..f995b86b704 100644 --- a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala +++ b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala @@ -436,7 +436,7 @@ class MyRemoteLogMetadataManager extends NoOpRemoteLogMetadataManager { val startOffset = idx * recordsPerSegment val endOffset = startOffset + recordsPerSegment - 1 val segmentLeaderEpochs: util.Map[Integer, java.lang.Long] = Collections.singletonMap(0, 0L) - segmentMetadataList.add(new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()), startOffset, endOffset, timestamp, 0, timestamp, segmentSize, Optional.empty(), RemoteLogSegmentState.COPY_SEGMENT_FINISHED, segmentLeaderEpochs, 0)) + segmentMetadataList.add(new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()), startOffset, endOffset, timestamp, 0, timestamp, segmentSize, Optional.empty(), RemoteLogSegmentState.COPY_SEGMENT_FINISHED, segmentLeaderEpochs)) } segmentMetadataList.iterator() } diff --git a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala index 95e28282f97..6ebcbeb4fcd 100644 --- a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala +++ b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala @@ -67,7 +67,7 @@ class RemoteIndexCacheTest { Files.createDirectory(tpDir.toPath) val remoteLogSegmentId = RemoteLogSegmentId.generateNew(idPartition) - rlsMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, baseOffset, lastOffset, time.milliseconds(), brokerId, time.milliseconds(), segmentSize, Collections.singletonMap(0, 0L), 0) + rlsMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, baseOffset, lastOffset, time.milliseconds(), brokerId, time.milliseconds(), segmentSize, Collections.singletonMap(0, 0L)) cache = new RemoteIndexCache(defaultRemoteIndexCacheSizeBytes, rsm, tpDir.toString) @@ -678,7 +678,7 @@ class RemoteIndexCacheTest { @Test def testConcurrentRemoveReadForCache(): Unit = { // Create a spy Cache Entry - val rlsMetadata = new RemoteLogSegmentMetadata(RemoteLogSegmentId.generateNew(idPartition), baseOffset, lastOffset, time.milliseconds(), brokerId, time.milliseconds(), segmentSize, Collections.singletonMap(0, 0L), 0) + val rlsMetadata = new RemoteLogSegmentMetadata(RemoteLogSegmentId.generateNew(idPartition), baseOffset, lastOffset, time.milliseconds(), brokerId, time.milliseconds(), segmentSize, Collections.singletonMap(0, 0L)) val timeIndex = spy(createTimeIndexForSegmentMetadata(rlsMetadata, new File(tpDir, DIR_NAME))) val txIndex = spy(createTxIndexForSegmentMetadata(rlsMetadata, new File(tpDir, DIR_NAME))) @@ -926,7 +926,7 @@ class RemoteIndexCacheTest { private def generateSpyCacheEntry(remoteLogSegmentId: RemoteLogSegmentId = RemoteLogSegmentId.generateNew(idPartition)): RemoteIndexCache.Entry = { - val rlsMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, baseOffset, lastOffset, time.milliseconds(), brokerId, time.milliseconds(), segmentSize, Collections.singletonMap(0, 0L), 0) + val rlsMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, baseOffset, lastOffset, time.milliseconds(), brokerId, time.milliseconds(), segmentSize, Collections.singletonMap(0, 0L)) val timeIndex = spy(createTimeIndexForSegmentMetadata(rlsMetadata, tpDir)) val txIndex = spy(createTxIndexForSegmentMetadata(rlsMetadata, tpDir)) val offsetIndex = spy(createOffsetIndexForSegmentMetadata(rlsMetadata, tpDir)) @@ -993,7 +993,7 @@ class RemoteIndexCacheTest { tpId: TopicIdPartition): List[RemoteLogSegmentMetadata] = { val metadataList = mutable.Buffer.empty[RemoteLogSegmentMetadata] for (i <- 0 until size) { - metadataList.append(new RemoteLogSegmentMetadata(new RemoteLogSegmentId(tpId, Uuid.randomUuid()), baseOffset * i, baseOffset * i + 10, time.milliseconds(), brokerId, time.milliseconds(), segmentSize, Collections.singletonMap(0, 0L), 0)) + metadataList.append(new RemoteLogSegmentMetadata(new RemoteLogSegmentId(tpId, Uuid.randomUuid()), baseOffset * i, baseOffset * i + 10, time.milliseconds(), brokerId, time.milliseconds(), segmentSize, Collections.singletonMap(0, 0L))) } metadataList.toList } diff --git a/metadata/src/main/resources/common/metadata/TopicRecord.json b/metadata/src/main/resources/common/metadata/TopicRecord.json index 5a316e6dbba..6fa5a05096d 100644 --- a/metadata/src/main/resources/common/metadata/TopicRecord.json +++ b/metadata/src/main/resources/common/metadata/TopicRecord.json @@ -17,18 +17,12 @@ "apiKey": 2, "type": "metadata", "name": "TopicRecord", - // Version 0 first version of TopicRecord with Name and TopicId - // Version 1 adds TieredEpoch and TieredState for KIP-950 - "validVersions": "0-1", + "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", "about": "The topic name." }, { "name": "TopicId", "type": "uuid", "versions": "0+", - "about": "The unique ID of this topic." }, - { "name": "TieredEpoch", "type": "int32", "versions": "1+", - "about": "The epoch denoting how many times the tiered state has changed" }, - { "name": "TieredState", "type": "bool", "versions": "1+", - "about": "Denotes whether the topic is currently tiered or not" } + "about": "The unique ID of this topic." } ] } diff --git a/metadata/src/test/java/org/apache/kafka/metadata/util/RecordRedactorTest.java b/metadata/src/test/java/org/apache/kafka/metadata/util/RecordRedactorTest.java index a5a18e582a0..0fbd4ef00ae 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/util/RecordRedactorTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/util/RecordRedactorTest.java @@ -50,7 +50,7 @@ public final class RecordRedactorTest { @Test public void testTopicRecordToString() { - assertEquals("TopicRecord(name='foo', topicId=UOovKkohSU6AGdYW33ZUNg, tieredEpoch=0, tieredState=false)", + assertEquals("TopicRecord(name='foo', topicId=UOovKkohSU6AGdYW33ZUNg)", REDACTOR.toLoggableString(new TopicRecord(). setTopicId(Uuid.fromString("UOovKkohSU6AGdYW33ZUNg")). setName("foo"))); diff --git a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java index 4389a90ab8a..9b589322bbf 100644 --- a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java +++ b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java @@ -78,11 +78,6 @@ public class RemoteLogSegmentMetadata extends RemoteLogMetadata { */ private final RemoteLogSegmentState state; - /** - * The epoch denoting how many times the tiered state has changed - */ - private final int tieredEpoch; - /** * Creates an instance with the given metadata of remote log segment. * <p> @@ -99,7 +94,6 @@ public class RemoteLogSegmentMetadata extends RemoteLogMetadata { * @param customMetadata Custom metadata. * @param state State of the respective segment of remoteLogSegmentId. * @param segmentLeaderEpochs leader epochs occurred within this segment. - * @param tieredEpoch The epoch denoting how many times the tiered state has changed */ public RemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId, long startOffset, @@ -110,11 +104,10 @@ public class RemoteLogSegmentMetadata extends RemoteLogMetadata { int segmentSizeInBytes, Optional<CustomMetadata> customMetadata, RemoteLogSegmentState state, - Map<Integer, Long> segmentLeaderEpochs, int tieredEpoch) { + Map<Integer, Long> segmentLeaderEpochs) { super(brokerId, eventTimestampMs); this.remoteLogSegmentId = Objects.requireNonNull(remoteLogSegmentId, "remoteLogSegmentId can not be null"); this.state = Objects.requireNonNull(state, "state can not be null"); - this.tieredEpoch = tieredEpoch; if (startOffset < 0) { throw new IllegalArgumentException("Unexpected start offset = " + startOffset + ". StartOffset for a remote segment cannot be negative"); @@ -151,7 +144,6 @@ public class RemoteLogSegmentMetadata extends RemoteLogMetadata { * @param eventTimestampMs Epoch time in milli seconds at which the remote log segment is copied to the remote tier storage. * @param segmentSizeInBytes Size of this segment in bytes. * @param segmentLeaderEpochs leader epochs occurred within this segment - * @param tieredEpoch */ public RemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId, long startOffset, @@ -160,8 +152,7 @@ public class RemoteLogSegmentMetadata extends RemoteLogMetadata { int brokerId, long eventTimestampMs, int segmentSizeInBytes, - Map<Integer, Long> segmentLeaderEpochs, - int tieredEpoch) { + Map<Integer, Long> segmentLeaderEpochs) { this(remoteLogSegmentId, startOffset, endOffset, @@ -170,8 +161,7 @@ public class RemoteLogSegmentMetadata extends RemoteLogMetadata { eventTimestampMs, segmentSizeInBytes, Optional.empty(), RemoteLogSegmentState.COPY_SEGMENT_STARTED, - segmentLeaderEpochs, - tieredEpoch); + segmentLeaderEpochs); } @@ -237,10 +227,6 @@ public class RemoteLogSegmentMetadata extends RemoteLogMetadata { return state; } - public int tieredEpoch() { - return tieredEpoch; - } - /** * Creates a new RemoteLogSegmentMetadata applying the given {@code rlsmUpdate} on this instance. This method will * not update this instance. @@ -255,7 +241,7 @@ public class RemoteLogSegmentMetadata extends RemoteLogMetadata { return new RemoteLogSegmentMetadata(remoteLogSegmentId, startOffset, endOffset, maxTimestampMs, rlsmUpdate.brokerId(), rlsmUpdate.eventTimestampMs(), - segmentSizeInBytes, rlsmUpdate.customMetadata(), rlsmUpdate.state(), segmentLeaderEpochs, tieredEpoch); + segmentSizeInBytes, rlsmUpdate.customMetadata(), rlsmUpdate.state(), segmentLeaderEpochs); } @Override diff --git a/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadataTest.java b/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadataTest.java index 4e11f57e39a..0aaaa4d99e4 100644 --- a/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadataTest.java +++ b/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadataTest.java @@ -43,14 +43,13 @@ class RemoteLogSegmentMetadataTest { long endOffset = 100L; int segmentSize = 123; long maxTimestamp = -1L; - int tieredEpoch = 0; Map<Integer, Long> segmentLeaderEpochs = new HashMap<>(); segmentLeaderEpochs.put(0, 0L); RemoteLogSegmentId segmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid()); RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata(segmentId, startOffset, endOffset, maxTimestamp, brokerId, eventTimestamp, segmentSize, - segmentLeaderEpochs, tieredEpoch); + segmentLeaderEpochs); CustomMetadata customMetadata = new CustomMetadata(new byte[]{0, 1, 2, 3}); RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new RemoteLogSegmentMetadataUpdate( @@ -62,8 +61,8 @@ class RemoteLogSegmentMetadataTest { segmentId, startOffset, endOffset, maxTimestamp, brokerIdFinished, timestampFinished, segmentSize, Optional.of(customMetadata), RemoteLogSegmentState.COPY_SEGMENT_FINISHED, - segmentLeaderEpochs, - tieredEpoch); + segmentLeaderEpochs + ); assertEquals(expectedUpdatedMetadata, updatedMetadata); // Check that the original metadata have not changed. @@ -75,6 +74,5 @@ class RemoteLogSegmentMetadataTest { assertEquals(eventTimestamp, segmentMetadata.eventTimestampMs()); assertEquals(segmentSize, segmentMetadata.segmentSizeInBytes()); assertEquals(segmentLeaderEpochs, segmentMetadata.segmentLeaderEpochs()); - assertEquals(tieredEpoch, segmentMetadata.tieredEpoch()); } } diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java index f97ca86b9ff..9e893d2cbc3 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java @@ -83,7 +83,7 @@ public class RemoteLogSegmentMetadataTransform implements RemoteLogMetadataTrans new RemoteLogSegmentMetadata(remoteLogSegmentId, record.startOffset(), record.endOffset(), record.maxTimestampMs(), record.brokerId(), record.eventTimestampMs(), record.segmentSizeInBytes(), - segmentLeaderEpochs, 0); + segmentLeaderEpochs); RemoteLogSegmentMetadataUpdate rlsmUpdate = new RemoteLogSegmentMetadataUpdate(remoteLogSegmentId, record.eventTimestampMs(), customMetadata, diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java index c92efce68af..66176c68477 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java @@ -425,7 +425,7 @@ public class ConsumerTaskTest { final TopicIdPartition idPartition, final long recordOffset) { final RemoteLogSegmentId segmentId = new RemoteLogSegmentId(idPartition, Uuid.randomUuid()); - final RemoteLogMetadata metadata = new RemoteLogSegmentMetadata(segmentId, 0L, 1L, 0L, 0, 0L, 1, Collections.singletonMap(0, 0L), 0); + final RemoteLogMetadata metadata = new RemoteLogSegmentMetadata(segmentId, 0L, 1L, 0L, 0, 0L, 1, Collections.singletonMap(0, 0L)); final ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>(TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME, metadataPartition, recordOffset, null, serde.serialize(metadata)); consumer.addRecord(record); } diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCacheTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCacheTest.java index 20918e0ca06..6b93f61dc7c 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCacheTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCacheTest.java @@ -60,7 +60,7 @@ public class RemoteLogMetadataCacheTest { if (state != RemoteLogSegmentState.COPY_SEGMENT_STARTED) { RemoteLogSegmentId segmentId = new RemoteLogSegmentId(tpId0, Uuid.randomUuid()); RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata(segmentId, 0, 100L, - -1L, brokerId0, time.milliseconds(), segmentSize, Collections.singletonMap(0, 0L), 0); + -1L, brokerId0, time.milliseconds(), segmentSize, Collections.singletonMap(0, 0L)); RemoteLogSegmentMetadata updatedMetadata = segmentMetadata.createWithUpdates( new RemoteLogSegmentMetadataUpdate(segmentId, time.milliseconds(), Optional.empty(), state, brokerId1)); @@ -102,7 +102,7 @@ public class RemoteLogMetadataCacheTest { long offset = 10L; RemoteLogSegmentId segmentId = new RemoteLogSegmentId(tpId0, Uuid.randomUuid()); RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata(segmentId, offset, 100L, - -1L, brokerId0, time.milliseconds(), segmentSize, Collections.singletonMap(leaderEpoch, offset), 0); + -1L, brokerId0, time.milliseconds(), segmentSize, Collections.singletonMap(leaderEpoch, offset)); cache.addCopyInProgressSegment(segmentMetadata); // invalid-transition-1. COPY_SEGMENT_STARTED -> DELETE_SEGMENT_FINISHED diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java index a4ac17cef23..d6a03441e8b 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java @@ -53,7 +53,7 @@ public class RemoteLogMetadataFormatterTest { Optional<CustomMetadata> customMetadata = Optional.of(new CustomMetadata(new byte[10])); RemoteLogSegmentMetadata remoteLogMetadata = new RemoteLogSegmentMetadata( remoteLogSegmentId, 0L, 100L, -1L, 1, 123L, 1024, customMetadata, COPY_SEGMENT_STARTED, - segLeaderEpochs, 0); + segLeaderEpochs); byte[] metadataBytes = new RemoteLogMetadataSerde().serialize(remoteLogMetadata); ConsumerRecord<byte[], byte[]> metadataRecord = new ConsumerRecord<>( diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java index 6e2b8960da1..ba2c3d1f26e 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java @@ -72,7 +72,7 @@ public class RemoteLogMetadataSerdeTest { RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid()); return new RemoteLogSegmentMetadata(remoteLogSegmentId, 0L, 100L, -1L, 1, time.milliseconds(), 1024, Optional.of(new CustomMetadata(new byte[] {0, 1, 2, 3})), - COPY_SEGMENT_STARTED, segLeaderEpochs, 0); + COPY_SEGMENT_STARTED, segLeaderEpochs); } private RemoteLogSegmentMetadataUpdate createRemoteLogSegmentMetadataUpdate() { diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java index 41d82897033..e7131a81303 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java @@ -68,7 +68,7 @@ public class RemoteLogMetadataTransformTest { private RemoteLogSegmentMetadata createRemoteLogSegmentMetadata() { RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid()); return new RemoteLogSegmentMetadata(remoteLogSegmentId, 0L, 100L, -1L, 1, - time.milliseconds(), 1024, Collections.singletonMap(0, 0L), 0); + time.milliseconds(), 1024, Collections.singletonMap(0, 0L)); } @Test diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java index 0a36ef1b559..0d91f69d4bb 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java @@ -94,7 +94,7 @@ public class RemoteLogSegmentLifecycleTest { leaderEpochSegment0.put(2, 80L); RemoteLogSegmentId segmentId0 = new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()); RemoteLogSegmentMetadata metadataSegment0 = new RemoteLogSegmentMetadata(segmentId0, 0L, - 100L, -1L, brokerId0, time.milliseconds(), segSize, leaderEpochSegment0, 0); + 100L, -1L, brokerId0, time.milliseconds(), segSize, leaderEpochSegment0); metadataManager.addRemoteLogSegmentMetadata(metadataSegment0).get(); verify(spyRemotePartitionMetadataStore).handleRemoteLogSegmentMetadata(metadataSegment0); @@ -223,7 +223,7 @@ public class RemoteLogSegmentLifecycleTest { throws RemoteStorageException, ExecutionException, InterruptedException { RemoteLogSegmentId segmentId = new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()); RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata(segmentId, startOffset, endOffset, - -1L, brokerId0, time.milliseconds(), segSize, segmentLeaderEpochs, 0); + -1L, brokerId0, time.milliseconds(), segSize, segmentLeaderEpochs); metadataManager.addRemoteLogSegmentMetadata(segmentMetadata).get(); verify(spyRemotePartitionMetadataStore).handleRemoteLogSegmentMetadata(segmentMetadata); @@ -258,7 +258,7 @@ public class RemoteLogSegmentLifecycleTest { // segments. RemoteLogSegmentId segmentId = new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()); RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata(segmentId, 0L, 50L, - -1L, brokerId0, time.milliseconds(), segSize, Collections.singletonMap(0, 0L), 0); + -1L, brokerId0, time.milliseconds(), segSize, Collections.singletonMap(0, 0L)); metadataManager.addRemoteLogSegmentMetadata(segmentMetadata).get(); verify(spyRemotePartitionMetadataStore).handleRemoteLogSegmentMetadata(segmentMetadata); diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java index 55541d18a42..f64996ae46d 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java @@ -127,7 +127,7 @@ public class TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest { int segSize = 1048576; RemoteLogSegmentMetadata leaderSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()), 0, 100, -1L, 0, - time.milliseconds(), segSize, Collections.singletonMap(0, 0L), 0); + time.milliseconds(), segSize, Collections.singletonMap(0, 0L)); ExecutionException exception = assertThrows(ExecutionException.class, () -> remoteLogMetadataManager.addRemoteLogSegmentMetadata(leaderSegmentMetadata).get()); assertEquals("org.apache.kafka.common.KafkaException: This consumer is not assigned to the target partition 0. Currently assigned partitions: []", @@ -135,7 +135,7 @@ public class TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest { RemoteLogSegmentMetadata followerSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(followerTopicIdPartition, Uuid.randomUuid()), 0, 100, -1L, 0, - time.milliseconds(), segSize, Collections.singletonMap(0, 0L), 0); + time.milliseconds(), segSize, Collections.singletonMap(0, 0L)); exception = assertThrows(ExecutionException.class, () -> remoteLogMetadataManager.addRemoteLogSegmentMetadata(followerSegmentMetadata).get()); assertEquals("org.apache.kafka.common.KafkaException: This consumer is not assigned to the target partition 0. Currently assigned partitions: []", exception.getMessage()); diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java index 4f95fac24d8..783aa022df7 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java @@ -79,11 +79,11 @@ public class TopicBasedRemoteLogMetadataManagerRestartTest { RemoteLogSegmentMetadata leaderSegmentMetadata = new RemoteLogSegmentMetadata( new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()), 0, 100, -1L, 0, - time.milliseconds(), segSize, Collections.singletonMap(0, 0L), 0); + time.milliseconds(), segSize, Collections.singletonMap(0, 0L)); RemoteLogSegmentMetadata followerSegmentMetadata = new RemoteLogSegmentMetadata( new RemoteLogSegmentId(followerTopicIdPartition, Uuid.randomUuid()), 0, 100, -1L, 0, - time.milliseconds(), segSize, Collections.singletonMap(0, 0L), 0); + time.milliseconds(), segSize, Collections.singletonMap(0, 0L)); try (TopicBasedRemoteLogMetadataManager topicBasedRemoteLogMetadataManager = createTopicBasedRemoteLogMetadataManager()) { // Register these partitions to RemoteLogMetadataManager. @@ -113,7 +113,7 @@ public class TopicBasedRemoteLogMetadataManagerRestartTest { RemoteLogSegmentMetadata leaderSegmentMetadata2 = new RemoteLogSegmentMetadata( new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()), 101, 200, -1L, 0, - time.milliseconds(), segSize, Collections.singletonMap(0, 101L), 0); + time.milliseconds(), segSize, Collections.singletonMap(0, 101L)); topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(leaderSegmentMetadata2).get(); // Check that both the stored segment and recently added segment are available. diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java index b0a65bd9c0a..d7842187596 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java @@ -141,12 +141,12 @@ public class TopicBasedRemoteLogMetadataManagerTest { // has not yet been subscribing as they are not yet registered. RemoteLogSegmentMetadata leaderSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(newLeaderTopicIdPartition, Uuid.randomUuid()), 0, 100, -1L, 0, - time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L), 0); + time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L)); assertThrows(Exception.class, () -> topicBasedRlmm().addRemoteLogSegmentMetadata(leaderSegmentMetadata).get()); RemoteLogSegmentMetadata followerSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(newFollowerTopicIdPartition, Uuid.randomUuid()), 0, 100, -1L, 0, - time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L), 0); + time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L)); assertThrows(Exception.class, () -> topicBasedRlmm().addRemoteLogSegmentMetadata(followerSegmentMetadata).get()); // `listRemoteLogSegments` will receive an exception as these topic partitions are not yet registered. @@ -195,11 +195,11 @@ public class TopicBasedRemoteLogMetadataManagerTest { }).when(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(any()); RemoteLogSegmentMetadata firstSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()), - 0, 100, -1L, 0, time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L), 0); + 0, 100, -1L, 0, time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L)); RemoteLogSegmentMetadata secondSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()), - 100, 200, -1L, 0, time.milliseconds(), SEG_SIZE * 2, Collections.singletonMap(0, 0L), 0); + 100, 200, -1L, 0, time.milliseconds(), SEG_SIZE * 2, Collections.singletonMap(0, 0L)); RemoteLogSegmentMetadata thirdSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()), - 200, 300, -1L, 0, time.milliseconds(), SEG_SIZE * 3, Collections.singletonMap(0, 0L), 0); + 200, 300, -1L, 0, time.milliseconds(), SEG_SIZE * 3, Collections.singletonMap(0, 0L)); topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(firstSegmentMetadata); topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(secondSegmentMetadata); @@ -240,11 +240,11 @@ public class TopicBasedRemoteLogMetadataManagerTest { }).when(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(any()); RemoteLogSegmentMetadata firstSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()), - 0, 100, -1L, 0, time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L), 0); + 0, 100, -1L, 0, time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L)); RemoteLogSegmentMetadata secondSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()), - 100, 200, -1L, 0, time.milliseconds(), SEG_SIZE * 2, Collections.singletonMap(1, 100L), 0); + 100, 200, -1L, 0, time.milliseconds(), SEG_SIZE * 2, Collections.singletonMap(1, 100L)); RemoteLogSegmentMetadata thirdSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()), - 200, 300, -1L, 0, time.milliseconds(), SEG_SIZE * 3, Collections.singletonMap(2, 200L), 0); + 200, 300, -1L, 0, time.milliseconds(), SEG_SIZE * 3, Collections.singletonMap(2, 200L)); topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(firstSegmentMetadata); topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(secondSegmentMetadata); @@ -285,9 +285,9 @@ public class TopicBasedRemoteLogMetadataManagerTest { }).when(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(any()); RemoteLogSegmentMetadata firstSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()), - 0, 100, -1L, 0, time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L), 0); + 0, 100, -1L, 0, time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L)); RemoteLogSegmentMetadata secondSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()), - 100, 200, -1L, 0, time.milliseconds(), SEG_SIZE * 2, Collections.singletonMap(1, 100L), 0); + 100, 200, -1L, 0, time.milliseconds(), SEG_SIZE * 2, Collections.singletonMap(1, 100L)); topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(firstSegmentMetadata); topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(secondSegmentMetadata); diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransformTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransformTest.java index bf370d55c13..0745bbaeb8f 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransformTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransformTest.java @@ -45,7 +45,8 @@ class RemoteLogSegmentMetadataTransformTest { 0L, 100L, -1L, 0, 0, 1234, customMetadata, RemoteLogSegmentState.COPY_SEGMENT_FINISHED, - segmentLeaderEpochs, 0); + segmentLeaderEpochs + ); RemoteLogSegmentMetadataTransform transform = new RemoteLogSegmentMetadataTransform(); ApiMessageAndVersion message = transform.toApiMessageAndVersion(metadata); diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java index d3391a45387..983fb0a46f6 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java @@ -358,7 +358,7 @@ public final class LocalTieredStorageTest { private RemoteLogSegmentMetadata newRemoteLogSegmentMetadata(final RemoteLogSegmentId id) { return new RemoteLogSegmentMetadata(id, 0, 0, -1L, -1, 1000L, - 1024, Collections.singletonMap(0, 0L), 0); + 1024, Collections.singletonMap(0, 0L)); } private RemoteLogSegmentId newRemoteLogSegmentId() { @@ -539,7 +539,7 @@ public final class LocalTieredStorageTest { private RemoteLogSegmentMetadata newMetadata(final RemoteLogSegmentId id) { return new RemoteLogSegmentMetadata(id, 0, 0, -1L, -1, 1000, - 1024, Collections.singletonMap(0, 0L), 0); + 1024, Collections.singletonMap(0, 0L)); } private String getStorageRootDirectory() { diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java index 6c3ed561600..a425c999c03 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java @@ -76,7 +76,7 @@ public class RemoteLogMetadataManagerTest { Map<Integer, Long> segmentLeaderEpochs = Collections.singletonMap(0, 101L); RemoteLogSegmentId segmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid()); RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata( - segmentId, 101L, 200L, -1L, BROKER_ID_0, time.milliseconds(), SEG_SIZE, segmentLeaderEpochs, 0); + segmentId, 101L, 200L, -1L, BROKER_ID_0, time.milliseconds(), SEG_SIZE, segmentLeaderEpochs); // Wait until the segment is added successfully. assertDoesNotThrow(() -> remoteLogMetadataManager.addRemoteLogSegmentMetadata(segmentMetadata).get()); @@ -115,7 +115,7 @@ public class RemoteLogMetadataManagerTest { segmentLeaderEpochs.put(3, 80L); RemoteLogSegmentId segmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid()); RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata( - segmentId, 0L, 100L, -1L, BROKER_ID_0, time.milliseconds(), SEG_SIZE, segmentLeaderEpochs, 0); + segmentId, 0L, 100L, -1L, BROKER_ID_0, time.milliseconds(), SEG_SIZE, segmentLeaderEpochs); // Wait until the segment is added successfully. assertDoesNotThrow(() -> remoteLogMetadataManager.addRemoteLogSegmentMetadata(segmentMetadata).get());
