kamalcph commented on code in PR #14285: URL: https://github.com/apache/kafka/pull/14285#discussion_r1305437640
########## core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala: ########## @@ -365,12 +363,11 @@ object MyRemoteStorageManager { val deleteSegmentEventCounter = new AtomicInteger(0) } -class MyRemoteStorageManager extends NoOpRemoteStorageManager with Logging { +class MyRemoteStorageManager extends NoOpRemoteStorageManager { import MyRemoteStorageManager._ override def deleteLogSegmentData(remoteLogSegmentMetadata: RemoteLogSegmentMetadata): Unit = { deleteSegmentEventCounter.incrementAndGet() - info(s"Deleted the remote log segment: $remoteLogSegmentMetadata, counter: ${deleteSegmentEventCounter.get()}") Review Comment: Previously, we were checking it manually by counting the number of log statements. Fixed the code by adding an assertion which validates the number of deleted segments. ########## storage/src/test/java/org/apache/kafka/tiered/storage/utils/RecordsKeyValueMatcher.java: ########## @@ -138,18 +140,21 @@ private boolean compare(ByteBuffer lhs, private SimpleRecord convert(Object recordCandidate) { if (recordCandidate instanceof ProducerRecord) { ProducerRecord<?, ?> record = (ProducerRecord<?, ?>) recordCandidate; + long timestamp = record.timestamp() == null ? RecordBatch.NO_TIMESTAMP : record.timestamp(); ByteBuffer keyBytes = Utils.wrapNullable(keySerde.serializer().serialize(topicPartition.topic(), (K) record.key())); ByteBuffer valueBytes = Utils.wrapNullable(valueSerde.serializer().serialize(topicPartition.topic(), (V) record.value())); - return new SimpleRecord(record.timestamp(), keyBytes, valueBytes, record.headers().toArray()); + Header[] headers = record.headers() != null ? record.headers().toArray() : Record.EMPTY_HEADERS; Review Comment: We have three different records: ProducerRecord, ConsumerRecord and SimpleRecord. They don't extend a common interface. ########## storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndConsumeFromLeaderTest.java: ########## @@ -127,10 +127,10 @@ protected void writeTestSpecifications(TieredStorageTestBuilder builder) { * - For topic B, only one segment is present in the tiered storage, as asserted by the * previous sub-test-case. */ - .bounce(broker) + // .bounce(broker) Review Comment: Bouncing the broker updates the log-start-offset to first-local-log-segment-base-offset in [LogLoader](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogLoader.scala#L183), so the remote log segments are removed due to breach by log-start-offset. Will fix this issue as part of KAFKA-15351. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org