This is an automated email from the ASF dual-hosted git repository.

divijv pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.6 by this push:
     new 6d077eca9f7 KAFKA-15399: Enable OffloadAndConsumeFromLeader test 
(#14285)
6d077eca9f7 is described below

commit 6d077eca9f7459a9e8c79efc3471194bddfe0fbb
Author: Kamal Chandraprakash <[email protected]>
AuthorDate: Mon Aug 28 15:59:50 2023 +0530

    KAFKA-15399: Enable OffloadAndConsumeFromLeader test (#14285)
    
    Reviewers: Divij Vaidya <[email protected]>, Christo Lolov 
<[email protected]>, Satish Duggana <[email protected]>
---
 checkstyle/import-control-storage.xml                 |  1 +
 .../main/java/kafka/log/remote/RemoteLogManager.java  |  4 ++--
 .../integration/kafka/admin/RemoteTopicCrudTest.scala | 19 ++++++++-----------
 .../server/log/remote/storage/LocalTieredStorage.java | 17 ++---------------
 .../remote/storage/LocalTieredStorageCondition.java   | 16 ++++++++++++----
 .../log/remote/storage/LocalTieredStorageEvent.java   |  6 ++++++
 .../tiered/storage/TieredStorageTestContext.java      |  9 ++++-----
 .../tiered/storage/TieredStorageTestHarness.java      |  6 ++++--
 .../kafka/tiered/storage/actions/ProduceAction.java   |  1 +
 .../integration/OffloadAndConsumeFromLeaderTest.java  |  6 +++---
 .../tiered/storage/utils/RecordsKeyValueMatcher.java  |  9 +++++++--
 11 files changed, 50 insertions(+), 44 deletions(-)

diff --git a/checkstyle/import-control-storage.xml 
b/checkstyle/import-control-storage.xml
index 6da7ac68849..2e0b85dcf76 100644
--- a/checkstyle/import-control-storage.xml
+++ b/checkstyle/import-control-storage.xml
@@ -97,6 +97,7 @@
         <allow pkg="kafka.utils" />
 
         <allow pkg="org.apache.kafka.common.config" />
+        <allow pkg="org.apache.kafka.common.header" />
         <allow pkg="org.apache.kafka.common.record" />
         <allow pkg="org.apache.kafka.common.replica" />
         <allow pkg="org.apache.kafka.common.network" />
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java 
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index f84fd2f80f7..8122d7992da 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -594,13 +594,13 @@ public class RemoteLogManager implements Closeable {
 
         private void maybeUpdateReadOffset(UnifiedLog log) throws 
RemoteStorageException {
             if (!copiedOffsetOption.isPresent()) {
-                logger.info("Find the highest remote offset for partition: {} 
after becoming leader, leaderEpoch: {}", topicIdPartition, leaderEpoch);
-
                 // This is found by traversing from the latest leader epoch 
from leader epoch history and find the highest offset
                 // of a segment with that epoch copied into remote storage. If 
it can not find an entry then it checks for the
                 // previous leader epoch till it finds an entry, If there are 
no entries till the earliest leader epoch in leader
                 // epoch cache then it starts copying the segments from the 
earliest epoch entry's offset.
                 copiedOffsetOption = 
OptionalLong.of(findHighestRemoteOffset(topicIdPartition, log));
+                logger.info("Found the highest copied remote offset: {} for 
partition: {} after becoming leader, " +
+                                "leaderEpoch: {}", copiedOffsetOption, 
topicIdPartition, leaderEpoch);
             }
         }
 
diff --git 
a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala 
b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
index bd81b40fa8e..23439e120cf 100644
--- a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
+++ b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
@@ -18,7 +18,7 @@ package kafka.admin
 
 import kafka.api.IntegrationTestHarness
 import kafka.server.KafkaConfig
-import kafka.utils.{Logging, TestInfoUtils, TestUtils}
+import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
 import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
 import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
@@ -283,6 +283,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
   def testTopicDeletion(quorum: String): Unit = {
+    MyRemoteStorageManager.deleteSegmentEventCounter.set(0)
     val numPartitions = 2
     val topicConfig = new Properties()
     topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
@@ -293,12 +294,9 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
     TestUtils.deleteTopicWithAdmin(createAdminClient(), testTopicName, brokers)
     assertThrowsException(classOf[UnknownTopicOrPartitionException],
       () => TestUtils.describeTopic(createAdminClient(), testTopicName), 
"Topic should be deleted")
-
-    // FIXME: It seems the storage manager is being instantiated in different 
class loader so couldn't verify the value
-    //  but ensured it by adding a log statement in the storage manager 
(manually).
-    //    assertEquals(numPartitions * MyRemoteLogMetadataManager.segmentCount,
-    //      MyRemoteStorageManager.deleteSegmentEventCounter.get(),
-    //      "Remote log segments should be deleted only once by the leader")
+    TestUtils.waitUntilTrue(() =>
+      numPartitions * MyRemoteLogMetadataManager.segmentCountPerPartition == 
MyRemoteStorageManager.deleteSegmentEventCounter.get(),
+      "Remote log segments should be deleted only once by the leader")
   }
 
   private def assertThrowsException(exceptionType: Class[_ <: Throwable],
@@ -365,12 +363,11 @@ object MyRemoteStorageManager {
   val deleteSegmentEventCounter = new AtomicInteger(0)
 }
 
-class MyRemoteStorageManager extends NoOpRemoteStorageManager with Logging {
+class MyRemoteStorageManager extends NoOpRemoteStorageManager {
   import MyRemoteStorageManager._
 
   override def deleteLogSegmentData(remoteLogSegmentMetadata: 
RemoteLogSegmentMetadata): Unit = {
     deleteSegmentEventCounter.incrementAndGet()
-    info(s"Deleted the remote log segment: $remoteLogSegmentMetadata, counter: 
${deleteSegmentEventCounter.get()}")
   }
 }
 
@@ -381,7 +378,7 @@ class MyRemoteLogMetadataManager extends 
NoOpRemoteLogMetadataManager {
 
   override def listRemoteLogSegments(topicIdPartition: TopicIdPartition): 
util.Iterator[RemoteLogSegmentMetadata] = {
     val segmentMetadataList = new util.ArrayList[RemoteLogSegmentMetadata]()
-    for (idx <- 0 until segmentCount) {
+    for (idx <- 0 until segmentCountPerPartition) {
       val timestamp = time.milliseconds()
       val startOffset = idx * recordsPerSegment
       val endOffset = startOffset + recordsPerSegment - 1
@@ -395,7 +392,7 @@ class MyRemoteLogMetadataManager extends 
NoOpRemoteLogMetadataManager {
 }
 
 object MyRemoteLogMetadataManager {
-  val segmentCount = 10
+  val segmentCountPerPartition = 10
   val recordsPerSegment = 100
   val segmentSize = 1024
 }
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java
index a8847e5bba9..43c09ccd908 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java
@@ -306,19 +306,13 @@ public final class LocalTieredStorage implements 
RemoteStorageManager {
     public Optional<CustomMetadata> copyLogSegmentData(final 
RemoteLogSegmentMetadata metadata, final LogSegmentData data)
             throws RemoteStorageException {
         Callable<Optional<CustomMetadata>> callable = () -> {
-            final RemoteLogSegmentId id = metadata.remoteLogSegmentId();
-            final LocalTieredStorageEvent.Builder eventBuilder = 
newEventBuilder(COPY_SEGMENT, id);
+            final LocalTieredStorageEvent.Builder eventBuilder = 
newEventBuilder(COPY_SEGMENT, metadata);
             RemoteLogSegmentFileset fileset = null;
-
             try {
                 fileset = openFileset(storageDirectory, metadata);
-
-                logger.info("Offloading log segment for {} from segment={}", 
id.topicIdPartition(), data.logSegment());
-
+                logger.info("Offloading log segment for {} from segment={}", 
metadata.topicIdPartition(), data.logSegment());
                 fileset.copy(transferer, data);
-
                 
storageListeners.onStorageEvent(eventBuilder.withFileset(fileset).build());
-
             } catch (final Exception e) {
                 // Keep the storage in a consistent state, i.e. a segment 
stored should always have with its
                 // associated offset and time indexes stored as well. Here, 
delete any file which was copied
@@ -327,14 +321,11 @@ public final class LocalTieredStorage implements 
RemoteStorageManager {
                 if (fileset != null) {
                     fileset.delete();
                 }
-
                 
storageListeners.onStorageEvent(eventBuilder.withException(e).build());
                 throw e;
             }
-
             return Optional.empty();
         };
-
         return wrap(callable);
     }
 
@@ -503,10 +494,6 @@ public final class LocalTieredStorage implements 
RemoteStorageManager {
         return wrap(() -> storageDirectory.getAbsolutePath());
     }
 
-    private LocalTieredStorageEvent.Builder newEventBuilder(final EventType 
type, final RemoteLogSegmentId segId) {
-        return LocalTieredStorageEvent.newBuilder(brokerId, type, 
eventTimestamp.incrementAndGet(), segId);
-    }
-
     private LocalTieredStorageEvent.Builder newEventBuilder(final EventType 
type, final RemoteLogSegmentMetadata md) {
         return LocalTieredStorageEvent
                 .newBuilder(brokerId, type, eventTimestamp.incrementAndGet(), 
md.remoteLogSegmentId())
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageCondition.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageCondition.java
index 6039e18a5db..2bee267e7f6 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageCondition.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageCondition.java
@@ -46,6 +46,7 @@ public final class LocalTieredStorageCondition {
     final EventType eventType;
     final int brokerId;
     final TopicPartition topicPartition;
+    final Integer baseOffset;
     final boolean failed;
 
     private final InternalListener listener;
@@ -66,6 +67,7 @@ public final class LocalTieredStorageCondition {
      * @param eventType The nature of the event to match.
      * @param brokerId The broker which should have generated the event.
      * @param tp The topic-partition which the event should relate to.
+     * @param baseOffset The base offset of the segment which the event should 
relate to.
      * @param failed Whether the event should correspond to a failed 
interaction with the remote storage.
      *
      * @return A condition with the given characteristics which listens to the 
given storages and can
@@ -75,8 +77,11 @@ public final class LocalTieredStorageCondition {
                                                           final EventType 
eventType,
                                                           final int brokerId,
                                                           final TopicPartition 
tp,
+                                                          final Integer 
baseOffset,
                                                           final boolean 
failed) {
-        return expectEvent(storages, eventType, brokerId, tp, failed, 1);
+        final LocalTieredStorageCondition condition = new 
LocalTieredStorageCondition(eventType, brokerId, tp, failed, baseOffset, 1);
+        storages.forEach(storage -> storage.addListener(condition.listener));
+        return condition;
     }
 
     /**
@@ -106,7 +111,7 @@ public final class LocalTieredStorageCondition {
                                                           final TopicPartition 
tp,
                                                           final boolean failed,
                                                           final int 
latchCount) {
-        final LocalTieredStorageCondition condition = new 
LocalTieredStorageCondition(eventType, brokerId, tp, failed, latchCount);
+        final LocalTieredStorageCondition condition = new 
LocalTieredStorageCondition(eventType, brokerId, tp, failed, null, latchCount);
         storages.forEach(storage -> storage.addListener(condition.listener));
         return condition;
     }
@@ -170,8 +175,8 @@ public final class LocalTieredStorageCondition {
     }
 
     public String toString() {
-        return format("Condition[eventType=%s, brokerId=%d, topicPartition=%s, 
failed=%b]",
-                eventType, brokerId, topicPartition, failed);
+        return format("Condition[eventType=%s, brokerId=%d, topicPartition=%s, 
baseOffset=%d, failed=%b]",
+                eventType, brokerId, topicPartition, baseOffset, failed);
     }
 
     private static final class InternalListener implements 
LocalTieredStorageListener {
@@ -200,11 +205,13 @@ public final class LocalTieredStorageCondition {
                                         final int id,
                                         final TopicPartition tp,
                                         final boolean failed,
+                                        final Integer baseOffset,
                                         final int latchCount) {
         this.eventType = requireNonNull(type);
         this.brokerId = id;
         this.topicPartition = requireNonNull(tp);
         this.failed = failed;
+        this.baseOffset = baseOffset;
         this.listener = new InternalListener(this, latchCount);
         this.next = null;
     }
@@ -214,6 +221,7 @@ public final class LocalTieredStorageCondition {
         this.brokerId = h.brokerId;
         this.topicPartition = h.topicPartition;
         this.failed = h.failed;
+        this.baseOffset = h.baseOffset;
         this.listener = h.listener;
         this.next = requireNonNull(next);
     }
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageEvent.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageEvent.java
index 2538eebec5c..1180163a889 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageEvent.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageEvent.java
@@ -76,6 +76,12 @@ public final class LocalTieredStorageEvent implements 
Comparable<LocalTieredStor
         if (!exception.map(e -> condition.failed).orElseGet(() -> 
!condition.failed)) {
             return false;
         }
+        if (condition.baseOffset != null && !metadata.isPresent()) {
+            return false;
+        }
+        if (condition.baseOffset != null && metadata.get().startOffset() != 
condition.baseOffset) {
+            return false;
+        }
         return true;
     }
 
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java
index d38df06e5ae..593d69cb38c 100644
--- 
a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java
@@ -17,6 +17,8 @@
 package org.apache.kafka.tiered.storage;
 
 import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.tiered.storage.specs.ExpandPartitionCountSpec;
 import org.apache.kafka.tiered.storage.specs.TopicSpec;
 import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
@@ -37,7 +39,6 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.TopicPartitionInfo;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.server.log.remote.storage.LocalTieredStorage;
@@ -69,8 +70,8 @@ import static 
org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG;
 public final class TieredStorageTestContext implements AutoCloseable {
 
     private final TieredStorageTestHarness harness;
-    private final Serializer<String> ser = Serdes.String().serializer();
-    private final Deserializer<String> de = Serdes.String().deserializer();
+    private final Serializer<String> ser = new StringSerializer();
+    private final Deserializer<String> de = new StringDeserializer();
     private final Map<String, TopicSpec> topicSpecs = new HashMap<>();
     private final TieredStorageTestReport testReport;
 
@@ -309,7 +310,5 @@ public final class TieredStorageTestContext implements 
AutoCloseable {
 
     @Override
     public void close() throws IOException {
-        Utils.closeAll(producer, consumer);
-        Utils.closeQuietly(admin, "Admin client");
     }
 }
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java
index da3762c5e9f..bed5452bdf5 100644
--- 
a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java
@@ -32,7 +32,6 @@ import 
org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
@@ -52,6 +51,7 @@ import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.
 import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP;
 import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP;
 import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP;
 
 import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP;
 import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP;
@@ -107,6 +107,7 @@ public abstract class TieredStorageTestHarness extends 
IntegrationTestHarness {
         
overridingProps.setProperty(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
                 TopicBasedRemoteLogMetadataManager.class.getName());
         overridingProps.setProperty(REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP, 
RLM_TASK_INTERVAL_MS.toString());
+        
overridingProps.setProperty(REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, 
"PLAINTEXT");
 
         overridingProps.setProperty(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP, 
storageConfigPrefix(""));
         
overridingProps.setProperty(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP, 
metadataConfigPrefix(""));
@@ -153,7 +154,6 @@ public abstract class TieredStorageTestHarness extends 
IntegrationTestHarness {
         context = new TieredStorageTestContext(this);
     }
 
-    @Disabled("Disabled until the trunk build is stable to test tiered 
storage")
     @Test
     public void executeTieredStorageTest() {
         TieredStorageTestBuilder builder = new TieredStorageTestBuilder();
@@ -200,6 +200,8 @@ public abstract class TieredStorageTestHarness extends 
IntegrationTestHarness {
                     if (loaderAwareRSM.delegate() instanceof 
LocalTieredStorage) {
                         storages.add((LocalTieredStorage) 
loaderAwareRSM.delegate());
                     }
+                } else if (storageManager instanceof LocalTieredStorage) {
+                    storages.add((LocalTieredStorage) storageManager);
                 }
             } else {
                 throw new AssertionError("Broker " + broker.config().brokerId()
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ProduceAction.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ProduceAction.java
index 9b20b3016ff..56287cebdcb 100644
--- 
a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ProduceAction.java
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ProduceAction.java
@@ -80,6 +80,7 @@ public final class ProduceAction implements 
TieredStorageTestAction {
                         COPY_SEGMENT,
                         spec.getSourceBrokerId(),
                         spec.getTopicPartition(),
+                        spec.getBaseOffset(),
                         false))
                 .collect(Collectors.toList());
 
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndConsumeFromLeaderTest.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndConsumeFromLeaderTest.java
index 93f2b0338c9..b5da2308d14 100644
--- 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndConsumeFromLeaderTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndConsumeFromLeaderTest.java
@@ -79,7 +79,7 @@ public final class OffloadAndConsumeFromLeaderTest extends 
TieredStorageTestHarn
                         enableRemoteLogStorage)
                 .withBatchSize(topicA, p0, batchSize)
                 .expectSegmentToBeOffloaded(broker, topicA, p0, 0, new 
KeyValueSpec("k1", "v1"))
-                .expectSegmentToBeOffloaded(broker, topicA, p0, 0, new 
KeyValueSpec("k2", "v2"))
+                .expectSegmentToBeOffloaded(broker, topicA, p0, 1, new 
KeyValueSpec("k2", "v2"))
                 .produce(topicA, p0, new KeyValueSpec("k1", "v1"), new 
KeyValueSpec("k2", "v2"),
                         new KeyValueSpec("k3", "v3"))
 
@@ -127,10 +127,10 @@ public final class OffloadAndConsumeFromLeaderTest 
extends TieredStorageTestHarn
                  *       - For topic B, only one segment is present in the 
tiered storage, as asserted by the
                  *         previous sub-test-case.
                  */
-                .bounce(broker)
+                // .bounce(broker)
                 .expectFetchFromTieredStorage(broker, topicA, p0, 1)
-                .expectFetchFromTieredStorage(broker, topicB, p0, 2)
                 .consume(topicA, p0, 1L, 2, 1)
+                .expectFetchFromTieredStorage(broker, topicB, p0, 2)
                 .consume(topicB, p0, 1L, 4, 3);
     }
 }
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/RecordsKeyValueMatcher.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/RecordsKeyValueMatcher.java
index 5fa7749f74a..902b5c2d713 100644
--- 
a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/RecordsKeyValueMatcher.java
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/RecordsKeyValueMatcher.java
@@ -19,7 +19,9 @@ package org.apache.kafka.tiered.storage.utils;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.SimpleRecord;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
@@ -138,18 +140,21 @@ public final class RecordsKeyValueMatcher<R1, R2, K, V> 
extends TypeSafeDiagnosi
     private SimpleRecord convert(Object recordCandidate) {
         if (recordCandidate instanceof ProducerRecord) {
             ProducerRecord<?, ?> record = (ProducerRecord<?, ?>) 
recordCandidate;
+            long timestamp = record.timestamp() != null ? record.timestamp() : 
RecordBatch.NO_TIMESTAMP;
             ByteBuffer keyBytes =
                     
Utils.wrapNullable(keySerde.serializer().serialize(topicPartition.topic(), (K) 
record.key()));
             ByteBuffer valueBytes =
                     
Utils.wrapNullable(valueSerde.serializer().serialize(topicPartition.topic(), 
(V) record.value()));
-            return new SimpleRecord(record.timestamp(), keyBytes, valueBytes, 
record.headers().toArray());
+            Header[] headers = record.headers() != null ? 
record.headers().toArray() : Record.EMPTY_HEADERS;
+            return new SimpleRecord(timestamp, keyBytes, valueBytes, headers);
         } else if (recordCandidate instanceof ConsumerRecord) {
             ConsumerRecord<?, ?> record = (ConsumerRecord<?, ?>) 
recordCandidate;
             ByteBuffer keyBytes =
                     
Utils.wrapNullable(keySerde.serializer().serialize(topicPartition.topic(), (K) 
record.key()));
             ByteBuffer valueBytes =
                     
Utils.wrapNullable(valueSerde.serializer().serialize(topicPartition.topic(), 
(V) record.value()));
-            return new SimpleRecord(record.timestamp(), keyBytes, valueBytes, 
record.headers().toArray());
+            Header[] headers = record.headers() != null ? 
record.headers().toArray() : Record.EMPTY_HEADERS;
+            return new SimpleRecord(record.timestamp(), keyBytes, valueBytes, 
headers);
         } else if (recordCandidate instanceof Record) {
             Record record = (Record) recordCandidate;
             return new SimpleRecord(record.timestamp(), record.key(), 
record.value(), record.headers());

Reply via email to