This is an automated email from the ASF dual-hosted git repository.

satishd pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b3db905b27f KAFKA-15107: Support custom metadata for remote log 
segment (#13984)
b3db905b27f is described below

commit b3db905b27ff4133f4018ac922c9ce2beb2d6087
Author: Ivan Yurchenko <[email protected]>
AuthorDate: Fri Aug 4 15:53:25 2023 +0300

    KAFKA-15107: Support custom metadata for remote log segment (#13984)
    
    * KAFKA-15107: Support custom metadata for remote log segment
    
    This commit does the changes discussed in the KIP-917. Mainly, changes the 
`RemoteStorageManager` interface in order to return `CustomMetadata` and then 
ensures these custom metadata are stored, propagated, (de-)serialized correctly 
along with the standard metadata throughout the whole lifecycle. It introduces 
the `remote.log.metadata.custom.metadata.max.size` to limit the custom metadata 
size acceptable by the broker and stop uploading in case a piece of metadata 
exceeds this limit.
    
    On testing:
    1. `RemoteLogManagerTest` checks the case when a piece of custom metadata 
is larger than the configured limit.
    2. `RemoteLogSegmentMetadataTest` checks if `createWithUpdates` works 
correctly, including custom metadata.
    3. `RemoteLogSegmentMetadataTransformTest`, 
`RemoteLogSegmentMetadataSnapshotTransformTest`, and 
`RemoteLogSegmentMetadataUpdateTransformTest` were added to test the 
corresponding class (de-)serialization, including custom metadata.
    4. `FileBasedRemoteLogMetadataCacheTest` checks if custom metadata are 
being correctly saved and loaded to a file (indirectly, via `equals`).
    5. `RemoteLogManagerConfigTest` checks if the configuration setting is 
handled correctly.
    
    Reviewers: Luke Chen <[email protected]>, Satish Duggana 
<[email protected]>, Divij Vaidya <[email protected]>
---
 checkstyle/suppressions.xml                        |   1 +
 .../CustomMetadataSizeLimitExceededException.java  |  20 ++++
 .../java/kafka/log/remote/RemoteLogManager.java    |  39 +++++++-
 .../kafka/log/remote/RemoteLogManagerTest.java     | 111 +++++++++++++++++++--
 .../remote/storage/RemoteLogSegmentMetadata.java   |  75 +++++++++++++-
 .../storage/RemoteLogSegmentMetadataUpdate.java    |  26 ++++-
 .../log/remote/storage/RemoteStorageManager.java   |   7 +-
 .../remote/storage/NoOpRemoteStorageManager.java   |   8 +-
 .../storage/RemoteLogSegmentMetadataTest.java      |  77 ++++++++++++++
 .../storage/FileBasedRemoteLogMetadataCache.java   |   3 +-
 .../storage/RemoteLogSegmentMetadataSnapshot.java  |  35 ++++++-
 .../RemoteLogSegmentMetadataSnapshotTransform.java |   5 +
 .../RemoteLogSegmentMetadataTransform.java         |   5 +
 .../RemoteLogSegmentMetadataUpdateTransform.java   |   7 +-
 .../ClassLoaderAwareRemoteStorageManager.java      |  10 +-
 .../log/remote/storage/RemoteLogManagerConfig.java |  24 ++++-
 .../message/RemoteLogSegmentMetadataRecord.json    |   8 ++
 .../RemoteLogSegmentMetadataSnapshotRecord.json    |   8 ++
 .../RemoteLogSegmentMetadataUpdateRecord.json      |   8 ++
 .../FileBasedRemoteLogMetadataCacheTest.java       |  13 ++-
 .../storage/RemoteLogMetadataCacheTest.java        |  18 +++-
 .../storage/RemoteLogMetadataFormatterTest.java    |  14 ++-
 .../storage/RemoteLogMetadataSerdeTest.java        |   9 +-
 .../storage/RemoteLogMetadataSnapshotFileTest.java |   6 +-
 .../storage/RemoteLogMetadataTransformTest.java    |   3 +
 .../storage/RemoteLogSegmentLifecycleTest.java     |  10 +-
 ...oteLogSegmentMetadataSnapshotTransformTest.java |  60 +++++++++++
 .../RemoteLogSegmentMetadataTransformTest.java     |  62 ++++++++++++
 ...emoteLogSegmentMetadataUpdateTransformTest.java |  62 ++++++++++++
 .../storage/InmemoryRemoteStorageManager.java      |   8 +-
 .../log/remote/storage/LocalTieredStorage.java     |   9 +-
 .../remote/storage/RemoteLogManagerConfigTest.java |   4 +-
 .../storage/RemoteLogMetadataManagerTest.java      |   5 +-
 33 files changed, 700 insertions(+), 60 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 6982920e155..e01e7094a16 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -40,6 +40,7 @@
               
files="core[\\/]src[\\/](generated|generated-test)[\\/].+.java$"/>
     <suppress checks="NPathComplexity" 
files="(ClusterTestExtensions|KafkaApisBuilder).java"/>
     <suppress 
checks="NPathComplexity|ClassFanOutComplexity|ClassDataAbstractionCoupling" 
files="(RemoteLogManager|RemoteLogManagerTest).java"/>
+    <suppress checks="ClassFanOutComplexity" 
files="RemoteLogManagerTest.java"/>
     <suppress checks="MethodLength"
               files="(KafkaClusterTestKit).java"/>
 
diff --git 
a/core/src/main/java/kafka/log/remote/CustomMetadataSizeLimitExceededException.java
 
b/core/src/main/java/kafka/log/remote/CustomMetadataSizeLimitExceededException.java
new file mode 100644
index 00000000000..c893f3488de
--- /dev/null
+++ 
b/core/src/main/java/kafka/log/remote/CustomMetadataSizeLimitExceededException.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote;
+
+class CustomMetadataSizeLimitExceededException extends Exception {
+}
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java 
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index 6b723aa38fa..33bde33882f 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -47,6 +47,7 @@ import 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
 import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
 import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
 import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
@@ -480,12 +481,14 @@ public class RemoteLogManager implements Closeable {
     class RLMTask extends CancellableRunnable {
 
         private final TopicIdPartition topicIdPartition;
+        private final int customMetadataSizeLimit;
         private final Logger logger;
 
         private volatile int leaderEpoch = -1;
 
-        public RLMTask(TopicIdPartition topicIdPartition) {
+        public RLMTask(TopicIdPartition topicIdPartition, int 
customMetadataSizeLimit) {
             this.topicIdPartition = topicIdPartition;
+            this.customMetadataSizeLimit = customMetadataSizeLimit;
             LogContext logContext = new LogContext("[RemoteLogManager=" + 
brokerId + " partition=" + topicIdPartition + "] ");
             logger = logContext.logger(RLMTask.class);
         }
@@ -586,6 +589,11 @@ public class RemoteLogManager implements Closeable {
                 } else {
                     logger.debug("Skipping copying segments, current 
read-offset:{}, and LSO:{}", copiedOffset, lso);
                 }
+            } catch (CustomMetadataSizeLimitExceededException e) {
+                // Only stop this task. Logging is done where the exception is 
thrown.
+                
brokerTopicStats.topicStats(log.topicPartition().topic()).failedRemoteCopyRequestRate().mark();
+                
brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().mark();
+                this.cancel();
             } catch (InterruptedException ex) {
                 throw ex;
             } catch (Exception ex) {
@@ -597,7 +605,8 @@ public class RemoteLogManager implements Closeable {
             }
         }
 
-        private void copyLogSegment(UnifiedLog log, LogSegment segment, long 
nextSegmentBaseOffset) throws InterruptedException, ExecutionException, 
RemoteStorageException, IOException {
+        private void copyLogSegment(UnifiedLog log, LogSegment segment, long 
nextSegmentBaseOffset) throws InterruptedException, ExecutionException, 
RemoteStorageException, IOException,
+                CustomMetadataSizeLimitExceededException {
             File logFile = segment.log().file();
             String logFileName = logFile.getName();
 
@@ -623,10 +632,30 @@ public class RemoteLogManager implements Closeable {
                     producerStateSnapshotFile.toPath(), leaderEpochsIndex);
             
brokerTopicStats.topicStats(log.topicPartition().topic()).remoteCopyRequestRate().mark();
             brokerTopicStats.allTopicsStats().remoteCopyRequestRate().mark();
-            remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, 
segmentData);
+            Optional<CustomMetadata> customMetadata = 
remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData);
 
             RemoteLogSegmentMetadataUpdate copySegmentFinishedRlsm = new 
RemoteLogSegmentMetadataUpdate(id, time.milliseconds(),
-                    RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
+                    customMetadata, 
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
+
+            if (customMetadata.isPresent()) {
+                long customMetadataSize = customMetadata.get().value().length;
+                if (customMetadataSize > this.customMetadataSizeLimit) {
+                    CustomMetadataSizeLimitExceededException e = new 
CustomMetadataSizeLimitExceededException();
+                    logger.error("Custom metadata size {} exceeds configured 
limit {}." +
+                                    " Copying will be stopped and copied 
segment will be attempted to clean." +
+                                    " Original metadata: {}",
+                            customMetadataSize, this.customMetadataSizeLimit, 
copySegmentStartedRlsm, e);
+                    try {
+                        // For deletion, we provide back the custom metadata 
by creating a new metadata object from the update.
+                        // However, the update itself will not be stored in 
this case.
+                        
remoteLogStorageManager.deleteLogSegmentData(copySegmentStartedRlsm.createWithUpdates(copySegmentFinishedRlsm));
+                        logger.info("Successfully cleaned segment after custom 
metadata size exceeded");
+                    } catch (RemoteStorageException e1) {
+                        logger.error("Error while cleaning segment after 
custom metadata size exceeded, consider cleaning manually", e1);
+                    }
+                    throw e;
+                }
+            }
 
             
remoteLogMetadataManager.updateRemoteLogSegmentMetadata(copySegmentFinishedRlsm).get();
             brokerTopicStats.topicStats(log.topicPartition().topic())
@@ -883,7 +912,7 @@ public class RemoteLogManager implements Closeable {
                                             Consumer<RLMTask> 
convertToLeaderOrFollower) {
         RLMTaskWithFuture rlmTaskWithFuture = 
leaderOrFollowerTasks.computeIfAbsent(topicPartition,
                 topicIdPartition -> {
-                    RLMTask task = new RLMTask(topicIdPartition);
+                    RLMTask task = new RLMTask(topicIdPartition, 
this.rlmConfig.remoteLogMetadataCustomMetadataMaxBytes());
                     // set this upfront when it is getting initialized instead 
of doing it after scheduling.
                     convertToLeaderOrFollower.accept(task);
                     LOGGER.info("Created a new task: {} and getting 
scheduled", task);
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java 
b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index 4d5bc8dbb3a..941f4dc961e 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -44,6 +44,7 @@ import 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
 import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
 import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
 import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
@@ -106,7 +107,6 @@ import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
@@ -342,7 +342,8 @@ public class RemoteLogManagerTest {
         dummyFuture.complete(null);
         
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
         
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);
-        
doNothing().when(remoteStorageManager).copyLogSegmentData(any(RemoteLogSegmentMetadata.class),
 any(LogSegmentData.class));
+        
when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class),
 any(LogSegmentData.class)))
+                .thenReturn(Optional.empty());
 
         // Verify the metrics for remote writes and for failures is zero 
before attempt to copy log segment
         assertEquals(0, 
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyRequestRate().count());
@@ -353,7 +354,7 @@ public class RemoteLogManagerTest {
         assertEquals(0, 
brokerTopicStats.allTopicsStats().remoteCopyBytesRate().count());
         assertEquals(0, 
brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
 
-        RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition);
+        RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
         task.convertToLeader(2);
         task.copyLogSegmentsToRemote(mockLog);
 
@@ -397,6 +398,100 @@ public class RemoteLogManagerTest {
         assertEquals(0, 
brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
     }
 
+    // We are verifying that if the size of a piece of custom metadata is 
bigger than the configured limit,
+    // the copy task should be cancelled and there should be an attempt to 
delete the just copied segment.
+    @Test
+    void testCustomMetadataSizeExceedsLimit() throws Exception {
+        long oldSegmentStartOffset = 0L;
+        long nextSegmentStartOffset = 150L;
+        long lastStableOffset = 150L;
+        long logEndOffset = 150L;
+
+        
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+        // leader epoch preparation
+        checkpoint.write(totalEpochEntries);
+        LeaderEpochFileCache cache = new 
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+        
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
 anyInt())).thenReturn(Optional.of(-1L));
+
+        File tempFile = TestUtils.tempFile();
+        File mockProducerSnapshotIndex = TestUtils.tempFile();
+        File tempDir = TestUtils.tempDirectory();
+        // create 2 log segments, with 0 and 150 as log start offset
+        LogSegment oldSegment = mock(LogSegment.class);
+        LogSegment activeSegment = mock(LogSegment.class);
+
+        when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+        when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+        verify(oldSegment, times(0)).readNextOffset();
+        verify(activeSegment, times(0)).readNextOffset();
+
+        FileRecords fileRecords = mock(FileRecords.class);
+        when(oldSegment.log()).thenReturn(fileRecords);
+        when(fileRecords.file()).thenReturn(tempFile);
+        when(fileRecords.sizeInBytes()).thenReturn(10);
+        when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
+
+        when(mockLog.activeSegment()).thenReturn(activeSegment);
+        when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
+        when(mockLog.logSegments(anyLong(), 
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment,
 activeSegment)));
+
+        ProducerStateManager mockStateManager = 
mock(ProducerStateManager.class);
+        when(mockLog.producerStateManager()).thenReturn(mockStateManager);
+        
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
+        when(mockLog.lastStableOffset()).thenReturn(lastStableOffset);
+        when(mockLog.logEndOffset()).thenReturn(logEndOffset);
+
+        LazyIndex idx = 
LazyIndex.forOffset(UnifiedLog.offsetIndexFile(tempDir, oldSegmentStartOffset, 
""), oldSegmentStartOffset, 1000);
+        LazyIndex timeIdx = 
LazyIndex.forTime(UnifiedLog.timeIndexFile(tempDir, oldSegmentStartOffset, ""), 
oldSegmentStartOffset, 1500);
+        File txnFile = UnifiedLog.transactionIndexFile(tempDir, 
oldSegmentStartOffset, "");
+        txnFile.createNewFile();
+        TransactionIndex txnIndex = new 
TransactionIndex(oldSegmentStartOffset, txnFile);
+        when(oldSegment.lazyTimeIndex()).thenReturn(timeIdx);
+        when(oldSegment.lazyOffsetIndex()).thenReturn(idx);
+        when(oldSegment.txnIndex()).thenReturn(txnIndex);
+
+        int customMetadataSizeLimit = 128;
+        CustomMetadata customMetadata = new CustomMetadata(new 
byte[customMetadataSizeLimit * 2]);
+
+        CompletableFuture<Void> dummyFuture = new CompletableFuture<>();
+        dummyFuture.complete(null);
+        
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
+        
when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class),
 any(LogSegmentData.class)))
+                .thenReturn(Optional.of(customMetadata));
+
+        RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, customMetadataSizeLimit);
+        task.convertToLeader(2);
+        task.copyLogSegmentsToRemote(mockLog);
+
+        ArgumentCaptor<RemoteLogSegmentMetadata> remoteLogSegmentMetadataArg = 
ArgumentCaptor.forClass(RemoteLogSegmentMetadata.class);
+        
verify(remoteLogMetadataManager).addRemoteLogSegmentMetadata(remoteLogSegmentMetadataArg.capture());
+
+        // Check we attempt to delete the segment data providing the custom 
metadata back.
+        RemoteLogSegmentMetadataUpdate expectedMetadataUpdate = new 
RemoteLogSegmentMetadataUpdate(
+                remoteLogSegmentMetadataArg.getValue().remoteLogSegmentId(), 
time.milliseconds(),
+                Optional.of(customMetadata), 
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
+        RemoteLogSegmentMetadata expectedDeleteMetadata = 
remoteLogSegmentMetadataArg.getValue().createWithUpdates(expectedMetadataUpdate);
+        verify(remoteStorageManager, 
times(1)).deleteLogSegmentData(eq(expectedDeleteMetadata));
+
+        // Check the task is cancelled in the end.
+        assertTrue(task.isCancelled());
+
+        // The metadata update should not be posted.
+        verify(remoteLogMetadataManager, 
never()).updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class));
+
+        // Verify the metric for remote writes are not updated.
+        assertEquals(1, 
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyRequestRate().count());
+        assertEquals(0, 
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyBytesRate().count());
+        // Verify we did not report any failure for remote writes
+        assertEquals(1, 
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteCopyRequestRate().count());
+        // Verify aggregate metrics
+        assertEquals(1, 
brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count());
+        assertEquals(0, 
brokerTopicStats.allTopicsStats().remoteCopyBytesRate().count());
+        assertEquals(1, 
brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
+    }
+
     @Test
     void testRemoteLogManagerTasksAvgIdlePercentMetrics() throws Exception {
         long oldSegmentStartOffset = 0L;
@@ -532,7 +627,7 @@ public class RemoteLogManagerTest {
         // Verify aggregate metrics
         assertEquals(0, 
brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count());
         assertEquals(0, 
brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
-        RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition);
+        RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
         task.convertToLeader(2);
         task.copyLogSegmentsToRemote(mockLog);
 
@@ -572,7 +667,7 @@ public class RemoteLogManagerTest {
         when(mockLog.logSegments(anyLong(), 
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment,
 activeSegment)));
         when(mockLog.lastStableOffset()).thenReturn(250L);
 
-        RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition);
+        RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
         task.convertToFollower();
         task.copyLogSegmentsToRemote(mockLog);
 
@@ -714,7 +809,7 @@ public class RemoteLogManagerTest {
 
     @Test
     void testRLMTaskShouldSetLeaderEpochCorrectly() {
-        RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition);
+        RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
         assertFalse(task.isLeader());
         task.convertToLeader(1);
         assertTrue(task.isLeader());
@@ -862,7 +957,7 @@ public class RemoteLogManagerTest {
         when(log.logSegments(5L, Long.MAX_VALUE))
                 
.thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segment1, 
segment2, activeSegment)));
 
-        RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition);
+        RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
         List<RemoteLogManager.EnrichedLogSegment> expected =
                 Arrays.asList(
                         new RemoteLogManager.EnrichedLogSegment(segment1, 10L),
@@ -888,7 +983,7 @@ public class RemoteLogManagerTest {
         when(log.logSegments(5L, Long.MAX_VALUE))
                 
.thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segment1, 
segment2, segment3, activeSegment)));
 
-        RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition);
+        RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
         List<RemoteLogManager.EnrichedLogSegment> expected =
                 Arrays.asList(
                         new RemoteLogManager.EnrichedLogSegment(segment1, 10L),
diff --git 
a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java
 
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java
index 47ae7bf21ad..9b589322bbf 100644
--- 
a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java
+++ 
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java
@@ -19,10 +19,12 @@ package org.apache.kafka.server.log.remote.storage;
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.annotation.InterfaceStability;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.TreeMap;
 
 /**
@@ -66,6 +68,11 @@ public class RemoteLogSegmentMetadata extends 
RemoteLogMetadata {
      */
     private final int segmentSizeInBytes;
 
+    /**
+     * Custom metadata.
+     */
+    private final Optional<CustomMetadata> customMetadata;
+
     /**
      * It indicates the state in which the action is executed on this segment.
      */
@@ -84,6 +91,7 @@ public class RemoteLogSegmentMetadata extends 
RemoteLogMetadata {
      * @param brokerId            Broker id from which this event is generated.
      * @param eventTimestampMs    Epoch time in milli seconds at which the 
remote log segment is copied to the remote tier storage.
      * @param segmentSizeInBytes  Size of this segment in bytes.
+     * @param customMetadata      Custom metadata.
      * @param state               State of the respective segment of 
remoteLogSegmentId.
      * @param segmentLeaderEpochs leader epochs occurred within this segment.
      */
@@ -94,6 +102,7 @@ public class RemoteLogSegmentMetadata extends 
RemoteLogMetadata {
                                     int brokerId,
                                     long eventTimestampMs,
                                     int segmentSizeInBytes,
+                                    Optional<CustomMetadata> customMetadata,
                                     RemoteLogSegmentState state,
                                     Map<Integer, Long> segmentLeaderEpochs) {
         super(brokerId, eventTimestampMs);
@@ -112,6 +121,7 @@ public class RemoteLogSegmentMetadata extends 
RemoteLogMetadata {
         this.endOffset = endOffset;
         this.maxTimestampMs = maxTimestampMs;
         this.segmentSizeInBytes = segmentSizeInBytes;
+        this.customMetadata = Objects.requireNonNull(customMetadata, 
"customMetadata can not be null");
 
         if (segmentLeaderEpochs == null || segmentLeaderEpochs.isEmpty()) {
             throw new IllegalArgumentException("segmentLeaderEpochs can not be 
null or empty");
@@ -149,6 +159,7 @@ public class RemoteLogSegmentMetadata extends 
RemoteLogMetadata {
                 maxTimestampMs,
                 brokerId,
                 eventTimestampMs, segmentSizeInBytes,
+                Optional.empty(),
                 RemoteLogSegmentState.COPY_SEGMENT_STARTED,
                 segmentLeaderEpochs);
     }
@@ -196,6 +207,13 @@ public class RemoteLogSegmentMetadata extends 
RemoteLogMetadata {
         return segmentLeaderEpochs;
     }
 
+    /**
+     * @return Custom metadata.
+     */
+    public Optional<CustomMetadata> customMetadata() {
+        return customMetadata;
+    }
+
     /**
      * Returns the current state of this remote log segment. It can be any of 
the below
      * <ul>
@@ -223,7 +241,7 @@ public class RemoteLogSegmentMetadata extends 
RemoteLogMetadata {
 
         return new RemoteLogSegmentMetadata(remoteLogSegmentId, startOffset,
                 endOffset, maxTimestampMs, rlsmUpdate.brokerId(), 
rlsmUpdate.eventTimestampMs(),
-                segmentSizeInBytes, rlsmUpdate.state(), segmentLeaderEpochs);
+                segmentSizeInBytes, rlsmUpdate.customMetadata(), 
rlsmUpdate.state(), segmentLeaderEpochs);
     }
 
     @Override
@@ -244,7 +262,9 @@ public class RemoteLogSegmentMetadata extends 
RemoteLogMetadata {
                 && maxTimestampMs == that.maxTimestampMs
                 && segmentSizeInBytes == that.segmentSizeInBytes
                 && Objects.equals(remoteLogSegmentId, that.remoteLogSegmentId)
-                && Objects.equals(segmentLeaderEpochs, 
that.segmentLeaderEpochs) && state == that.state
+                && Objects.equals(segmentLeaderEpochs, 
that.segmentLeaderEpochs)
+                && Objects.equals(customMetadata, that.customMetadata)
+                && state == that.state
                 && eventTimestampMs() == that.eventTimestampMs()
                 && brokerId() == that.brokerId();
     }
@@ -252,7 +272,7 @@ public class RemoteLogSegmentMetadata extends 
RemoteLogMetadata {
     @Override
     public int hashCode() {
         return Objects.hash(remoteLogSegmentId, startOffset, endOffset, 
brokerId(), maxTimestampMs,
-                eventTimestampMs(), segmentLeaderEpochs, segmentSizeInBytes, 
state);
+                eventTimestampMs(), segmentLeaderEpochs, segmentSizeInBytes, 
customMetadata, state);
     }
 
     @Override
@@ -266,8 +286,57 @@ public class RemoteLogSegmentMetadata extends 
RemoteLogMetadata {
                ", eventTimestampMs=" + eventTimestampMs() +
                ", segmentLeaderEpochs=" + segmentLeaderEpochs +
                ", segmentSizeInBytes=" + segmentSizeInBytes +
+               ", customMetadata=" + customMetadata +
                ", state=" + state +
                '}';
     }
 
+    /**
+     * Custom metadata from a {@link RemoteStorageManager} plugin.
+     *
+     * <p>The content of these metadata is RSM-dependent and is opaque to the 
broker, i.e.
+     * it's not interpreted, only stored along with the rest of the remote log 
segment metadata.
+     *
+     * <p>Examples of such metadata are:
+     * <ol>
+     *     <li>The storage path on the remote storage in case it's 
nondeterministic or version-dependent.</li>
+     *     <li>The actual size of the all files related to the segment on the 
remote storage.</li>
+     * </ol>
+     *
+     * <p>The maximum size the broker accepts and stores is controlled by
+     * the {@code remote.log.metadata.custom.metadata.max.bytes} setting.
+     */
+    public static class CustomMetadata {
+        private final byte[] value;
+
+        public CustomMetadata(byte[] value) {
+            this.value = value;
+        }
+
+        public byte[] value() {
+            return value;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            CustomMetadata that = (CustomMetadata) o;
+            return Arrays.equals(value, that.value);
+        }
+
+        @Override
+        public int hashCode() {
+            return Arrays.hashCode(value);
+        }
+
+        @Override
+        public String toString() {
+            return "CustomMetadata{" + value.length + " bytes}";
+        }
+    }
 }
diff --git 
a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadataUpdate.java
 
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadataUpdate.java
index a01df9602d2..210615ef53f 100644
--- 
a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadataUpdate.java
+++ 
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadataUpdate.java
@@ -18,8 +18,10 @@ package org.apache.kafka.server.log.remote.storage;
 
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.annotation.InterfaceStability;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
 
 import java.util.Objects;
+import java.util.Optional;
 
 /**
  * It describes the metadata update about the log segment in the remote 
storage. This is currently used to update the
@@ -34,6 +36,11 @@ public class RemoteLogSegmentMetadataUpdate extends 
RemoteLogMetadata {
      */
     private final RemoteLogSegmentId remoteLogSegmentId;
 
+    /**
+     * Custom metadata.
+     */
+    private final Optional<CustomMetadata> customMetadata;
+
     /**
      * It indicates the state in which the action is executed on this segment.
      */
@@ -42,13 +49,17 @@ public class RemoteLogSegmentMetadataUpdate extends 
RemoteLogMetadata {
     /**
      * @param remoteLogSegmentId Universally unique remote log segment id.
      * @param eventTimestampMs   Epoch time in milli seconds at which the 
remote log segment is copied to the remote tier storage.
+     * @param customMetadata     Custom metadata.
      * @param state              State of the remote log segment.
      * @param brokerId           Broker id from which this event is generated.
      */
     public RemoteLogSegmentMetadataUpdate(RemoteLogSegmentId 
remoteLogSegmentId, long eventTimestampMs,
-                                          RemoteLogSegmentState state, int 
brokerId) {
+                                          Optional<CustomMetadata> 
customMetadata,
+                                          RemoteLogSegmentState state,
+                                          int brokerId) {
         super(brokerId, eventTimestampMs);
         this.remoteLogSegmentId = Objects.requireNonNull(remoteLogSegmentId, 
"remoteLogSegmentId can not be null");
+        this.customMetadata = Objects.requireNonNull(customMetadata, 
"customMetadata can not be null");
         this.state = Objects.requireNonNull(state, "state can not be null");
     }
 
@@ -59,6 +70,13 @@ public class RemoteLogSegmentMetadataUpdate extends 
RemoteLogMetadata {
         return remoteLogSegmentId;
     }
 
+    /**
+     * @return Custom metadata.
+     */
+    public Optional<CustomMetadata> customMetadata() {
+        return customMetadata;
+    }
+
     /**
      * It represents the state of the remote log segment. It can be one of the 
values of {@link RemoteLogSegmentState}.
      */
@@ -81,6 +99,7 @@ public class RemoteLogSegmentMetadataUpdate extends 
RemoteLogMetadata {
         }
         RemoteLogSegmentMetadataUpdate that = (RemoteLogSegmentMetadataUpdate) 
o;
         return Objects.equals(remoteLogSegmentId, that.remoteLogSegmentId) &&
+               Objects.equals(customMetadata, that.customMetadata) &&
                state == that.state &&
                eventTimestampMs() == that.eventTimestampMs() &&
                brokerId() == that.brokerId();
@@ -88,14 +107,15 @@ public class RemoteLogSegmentMetadataUpdate extends 
RemoteLogMetadata {
 
     @Override
     public int hashCode() {
-        return Objects.hash(remoteLogSegmentId, state, eventTimestampMs(), 
brokerId());
+        return Objects.hash(remoteLogSegmentId, customMetadata, state, 
eventTimestampMs(), brokerId());
     }
 
     @Override
     public String toString() {
         return "RemoteLogSegmentMetadataUpdate{" +
                "remoteLogSegmentId=" + remoteLogSegmentId +
-                ", state=" + state +
+               ", customMetadata=" + customMetadata +
+               ", state=" + state +
                ", eventTimestampMs=" + eventTimestampMs() +
                ", brokerId=" + brokerId() +
                '}';
diff --git 
a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java
 
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java
index cc26109969c..fa819979b2c 100644
--- 
a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java
+++ 
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java
@@ -18,9 +18,11 @@ package org.apache.kafka.server.log.remote.storage;
 
 import org.apache.kafka.common.Configurable;
 import org.apache.kafka.common.annotation.InterfaceStability;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
 
 import java.io.Closeable;
 import java.io.InputStream;
+import java.util.Optional;
 
 /**
  * This interface provides the lifecycle of remote log segments that includes 
copy, fetch, and delete from remote
@@ -81,10 +83,11 @@ public interface RemoteStorageManager extends Configurable, 
Closeable {
      *
      * @param remoteLogSegmentMetadata metadata about the remote log segment.
      * @param logSegmentData           data to be copied to tiered storage.
+     * @return custom metadata to be added to the segment metadata after 
copying.
      * @throws RemoteStorageException if there are any errors in storing the 
data of the segment.
      */
-    void copyLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
-                            LogSegmentData logSegmentData)
+    Optional<CustomMetadata> copyLogSegmentData(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata,
+                                                LogSegmentData logSegmentData)
             throws RemoteStorageException;
 
     /**
diff --git 
a/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteStorageManager.java
 
b/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteStorageManager.java
index 8a83033aa04..dfd905ce0cd 100644
--- 
a/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteStorageManager.java
+++ 
b/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteStorageManager.java
@@ -16,14 +16,18 @@
  */
 package org.apache.kafka.server.log.remote.storage;
 
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
+
 import java.io.ByteArrayInputStream;
 import java.io.InputStream;
 import java.util.Map;
+import java.util.Optional;
 
 public class NoOpRemoteStorageManager implements RemoteStorageManager {
     @Override
-    public void copyLogSegmentData(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata,
-                                   LogSegmentData logSegmentData) {
+    public Optional<CustomMetadata> 
copyLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
+                                                       LogSegmentData 
logSegmentData) {
+        return Optional.empty();
     }
 
     @Override
diff --git 
a/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadataTest.java
 
b/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadataTest.java
new file mode 100644
index 00000000000..4cd2b350441
--- /dev/null
+++ 
b/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadataTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.junit.jupiter.api.Test;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class RemoteLogSegmentMetadataTest {
+    private static final TopicIdPartition TP0 = new 
TopicIdPartition(Uuid.randomUuid(),
+            new TopicPartition("foo", 0));
+
+    @Test
+    void createWithUpdates() {
+        int brokerId = 0;
+        int eventTimestamp = 0;
+        int brokerIdFinished = 1;
+        int timestampFinished = 1;
+        long startOffset = 0L;
+        long endOffset = 100L;
+        int segmentSize = 123;
+        long maxTimestamp = -1L;
+
+        Map<Integer, Long> segmentLeaderEpochs = new HashMap<>();
+        segmentLeaderEpochs.put(0, 0L);
+        RemoteLogSegmentId segmentId = new RemoteLogSegmentId(TP0, 
Uuid.randomUuid());
+        RemoteLogSegmentMetadata segmentMetadata = new 
RemoteLogSegmentMetadata(segmentId, startOffset, endOffset,
+                maxTimestamp, brokerId, eventTimestamp, segmentSize,
+                segmentLeaderEpochs);
+
+        CustomMetadata customMetadata = new CustomMetadata(new byte[]{0, 1, 2, 
3});
+        RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new 
RemoteLogSegmentMetadataUpdate(
+                segmentId, timestampFinished, Optional.of(customMetadata), 
RemoteLogSegmentState.COPY_SEGMENT_FINISHED,
+                brokerIdFinished);
+        RemoteLogSegmentMetadata updatedMetadata = 
segmentMetadata.createWithUpdates(segmentMetadataUpdate);
+
+        RemoteLogSegmentMetadata expectedUpdatedMetadata = new 
RemoteLogSegmentMetadata(
+                segmentId, startOffset, endOffset,
+                maxTimestamp, brokerIdFinished, timestampFinished, 
segmentSize, Optional.of(customMetadata),
+                RemoteLogSegmentState.COPY_SEGMENT_FINISHED,
+                segmentLeaderEpochs
+        );
+        assertEquals(expectedUpdatedMetadata, updatedMetadata);
+
+        // Check that the original metadata have not changed.
+        assertEquals(segmentId, segmentMetadata.remoteLogSegmentId());
+        assertEquals(startOffset, segmentMetadata.startOffset());
+        assertEquals(endOffset, segmentMetadata.endOffset());
+        assertEquals(maxTimestamp, segmentMetadata.maxTimestampMs());
+        assertEquals(brokerId, segmentMetadata.brokerId());
+        assertEquals(eventTimestamp, segmentMetadata.eventTimestampMs());
+        assertEquals(segmentSize, segmentMetadata.segmentSizeInBytes());
+        assertEquals(segmentLeaderEpochs, 
segmentMetadata.segmentLeaderEpochs());
+    }
+}
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCache.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCache.java
index 15e4562a2da..0b0b2817061 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCache.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCache.java
@@ -77,7 +77,8 @@ public class FileBasedRemoteLogMetadataCache extends 
RemoteLogMetadataCache {
     private RemoteLogSegmentMetadata 
createRemoteLogSegmentMetadata(RemoteLogSegmentMetadataSnapshot snapshot) {
         return new RemoteLogSegmentMetadata(new 
RemoteLogSegmentId(topicIdPartition, snapshot.segmentId()), 
snapshot.startOffset(),
                                             snapshot.endOffset(), 
snapshot.maxTimestampMs(), snapshot.brokerId(), snapshot.eventTimestampMs(),
-                                            snapshot.segmentSizeInBytes(), 
snapshot.state(), snapshot.segmentLeaderEpochs());
+                                            snapshot.segmentSizeInBytes(), 
snapshot.customMetadata(), snapshot.state(), snapshot.segmentLeaderEpochs()
+        );
     }
 
     /**
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentMetadataSnapshot.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentMetadataSnapshot.java
index c936d5056f8..ec1ed6a66d1 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentMetadataSnapshot.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentMetadataSnapshot.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
 
 import java.util.Collections;
@@ -27,6 +28,7 @@ import java.util.Map;
 import java.util.NavigableMap;
 import java.util.Objects;
 import java.util.TreeMap;
+import java.util.Optional;
 
 /**
  * This class represents the entry containing the metadata about a remote log 
segment. This is similar to
@@ -68,6 +70,11 @@ public class RemoteLogSegmentMetadataSnapshot extends 
RemoteLogMetadata {
      */
     private final int segmentSizeInBytes;
 
+    /**
+     * Custom metadata.
+     */
+    private final Optional<CustomMetadata> customMetadata;
+
     /**
      * It indicates the state in which the action is executed on this segment.
      */
@@ -79,13 +86,14 @@ public class RemoteLogSegmentMetadataSnapshot extends 
RemoteLogMetadata {
      * {@code segmentLeaderEpochs} can not be empty. If all the records in 
this segment belong to the same leader epoch
      * then it should have an entry with epoch mapping to start-offset of this 
segment.
      *
-     * @param segmentId                  Universally unique remote log segment 
id.
+     * @param segmentId           Universally unique remote log segment id.
      * @param startOffset         Start offset of this segment (inclusive).
      * @param endOffset           End offset of this segment (inclusive).
      * @param maxTimestampMs      Maximum timestamp in milliseconds in this 
segment.
      * @param brokerId            Broker id from which this event is generated.
      * @param eventTimestampMs    Epoch time in milliseconds at which the 
remote log segment is copied to the remote tier storage.
      * @param segmentSizeInBytes  Size of this segment in bytes.
+     * @param customMetadata      Custom metadata.
      * @param state               State of the respective segment of 
remoteLogSegmentId.
      * @param segmentLeaderEpochs leader epochs occurred within this segment.
      */
@@ -96,6 +104,7 @@ public class RemoteLogSegmentMetadataSnapshot extends 
RemoteLogMetadata {
                                             int brokerId,
                                             long eventTimestampMs,
                                             int segmentSizeInBytes,
+                                            Optional<CustomMetadata> 
customMetadata,
                                             RemoteLogSegmentState state,
                                             Map<Integer, Long> 
segmentLeaderEpochs) {
         super(brokerId, eventTimestampMs);
@@ -106,6 +115,7 @@ public class RemoteLogSegmentMetadataSnapshot extends 
RemoteLogMetadata {
         this.endOffset = endOffset;
         this.maxTimestampMs = maxTimestampMs;
         this.segmentSizeInBytes = segmentSizeInBytes;
+        this.customMetadata = Objects.requireNonNull(customMetadata, 
"customMetadata can not be null");
 
         if (segmentLeaderEpochs == null || segmentLeaderEpochs.isEmpty()) {
             throw new IllegalArgumentException("segmentLeaderEpochs can not be 
null or empty");
@@ -117,7 +127,8 @@ public class RemoteLogSegmentMetadataSnapshot extends 
RemoteLogMetadata {
     public static RemoteLogSegmentMetadataSnapshot 
create(RemoteLogSegmentMetadata metadata) {
         return new 
RemoteLogSegmentMetadataSnapshot(metadata.remoteLogSegmentId().id(), 
metadata.startOffset(), metadata.endOffset(),
                                                     metadata.maxTimestampMs(), 
metadata.brokerId(), metadata.eventTimestampMs(),
-                                                    
metadata.segmentSizeInBytes(), metadata.state(), 
metadata.segmentLeaderEpochs());
+                                                    
metadata.segmentSizeInBytes(), metadata.customMetadata(), metadata.state(), 
metadata.segmentLeaderEpochs()
+        );
     }
 
     /**
@@ -162,6 +173,13 @@ public class RemoteLogSegmentMetadataSnapshot extends 
RemoteLogMetadata {
         return segmentLeaderEpochs;
     }
 
+    /**
+     * @return Custom metadata.
+     */
+    public Optional<CustomMetadata> customMetadata() {
+        return customMetadata;
+    }
+
     /**
      * Returns the current state of this remote log segment. It can be any of 
the below
      * <ul>
@@ -185,13 +203,19 @@ public class RemoteLogSegmentMetadataSnapshot extends 
RemoteLogMetadata {
         if (this == o) return true;
         if (!(o instanceof RemoteLogSegmentMetadataSnapshot)) return false;
         RemoteLogSegmentMetadataSnapshot that = 
(RemoteLogSegmentMetadataSnapshot) o;
-        return startOffset == that.startOffset && endOffset == that.endOffset 
&& maxTimestampMs == that.maxTimestampMs && segmentSizeInBytes == 
that.segmentSizeInBytes && Objects.equals(
-                segmentId, that.segmentId) && 
Objects.equals(segmentLeaderEpochs, that.segmentLeaderEpochs) && state == 
that.state;
+        return startOffset == that.startOffset
+                && endOffset == that.endOffset
+                && maxTimestampMs == that.maxTimestampMs
+                && segmentSizeInBytes == that.segmentSizeInBytes
+                && Objects.equals(customMetadata, that.customMetadata)
+                && Objects.equals(segmentId, that.segmentId)
+                && Objects.equals(segmentLeaderEpochs, 
that.segmentLeaderEpochs)
+                && state == that.state;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(segmentId, startOffset, endOffset, maxTimestampMs, 
segmentLeaderEpochs, segmentSizeInBytes, state);
+        return Objects.hash(segmentId, startOffset, endOffset, maxTimestampMs, 
segmentLeaderEpochs, segmentSizeInBytes, customMetadata, state);
     }
 
     @Override
@@ -203,6 +227,7 @@ public class RemoteLogSegmentMetadataSnapshot extends 
RemoteLogMetadata {
                 ", maxTimestampMs=" + maxTimestampMs +
                 ", segmentLeaderEpochs=" + segmentLeaderEpochs +
                 ", segmentSizeInBytes=" + segmentSizeInBytes +
+                ", customMetadata=" + customMetadata +
                 ", state=" + state +
                 '}';
     }
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransform.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransform.java
index bd613f8c9cd..ad47ee05c84 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransform.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransform.java
@@ -19,11 +19,13 @@ package 
org.apache.kafka.server.log.remote.metadata.storage.serialization;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import 
org.apache.kafka.server.log.remote.metadata.storage.RemoteLogSegmentMetadataSnapshot;
 import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataSnapshotRecord;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
 
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
 public class RemoteLogSegmentMetadataSnapshotTransform implements 
RemoteLogMetadataTransform<RemoteLogSegmentMetadataSnapshot> {
@@ -39,6 +41,7 @@ public class RemoteLogSegmentMetadataSnapshotTransform 
implements RemoteLogMetad
                 .setSegmentSizeInBytes(segmentMetadata.segmentSizeInBytes())
                 
.setSegmentLeaderEpochs(createSegmentLeaderEpochsEntry(segmentMetadata.segmentLeaderEpochs()))
                 .setRemoteLogSegmentState(segmentMetadata.state().id());
+        segmentMetadata.customMetadata().ifPresent(md -> 
record.setCustomMetadata(md.value()));
 
         return new ApiMessageAndVersion(record, 
record.highestSupportedVersion());
     }
@@ -59,6 +62,7 @@ public class RemoteLogSegmentMetadataSnapshotTransform 
implements RemoteLogMetad
             segmentLeaderEpochs.put(segmentLeaderEpoch.leaderEpoch(), 
segmentLeaderEpoch.offset());
         }
 
+        Optional<CustomMetadata> customMetadata = 
Optional.ofNullable(record.customMetadata()).map(CustomMetadata::new);
         return new RemoteLogSegmentMetadataSnapshot(record.segmentId(),
                                                     record.startOffset(),
                                                     record.endOffset(),
@@ -66,6 +70,7 @@ public class RemoteLogSegmentMetadataSnapshotTransform 
implements RemoteLogMetad
                                                     record.brokerId(),
                                                     record.eventTimestampMs(),
                                                     
record.segmentSizeInBytes(),
+                                                    customMetadata,
                                                     
RemoteLogSegmentState.forId(record.remoteLogSegmentState()),
                                                     segmentLeaderEpochs);
     }
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java
index 4282b9e71f3..9e893d2cbc3 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java
@@ -22,12 +22,14 @@ import org.apache.kafka.server.common.ApiMessageAndVersion;
 import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
 import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
 
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
 public class RemoteLogSegmentMetadataTransform implements 
RemoteLogMetadataTransform<RemoteLogSegmentMetadata> {
@@ -43,6 +45,7 @@ public class RemoteLogSegmentMetadataTransform implements 
RemoteLogMetadataTrans
                 .setSegmentSizeInBytes(segmentMetadata.segmentSizeInBytes())
                 
.setSegmentLeaderEpochs(createSegmentLeaderEpochsEntry(segmentMetadata))
                 .setRemoteLogSegmentState(segmentMetadata.state().id());
+        segmentMetadata.customMetadata().ifPresent(md -> 
record.setCustomMetadata(md.value()));
 
         return new ApiMessageAndVersion(record, 
record.highestSupportedVersion());
     }
@@ -75,6 +78,7 @@ public class RemoteLogSegmentMetadataTransform implements 
RemoteLogMetadataTrans
             segmentLeaderEpochs.put(segmentLeaderEpoch.leaderEpoch(), 
segmentLeaderEpoch.offset());
         }
 
+        Optional<CustomMetadata> customMetadata = 
Optional.ofNullable(record.customMetadata()).map(CustomMetadata::new);
         RemoteLogSegmentMetadata remoteLogSegmentMetadata =
                 new RemoteLogSegmentMetadata(remoteLogSegmentId, 
record.startOffset(), record.endOffset(),
                                              record.maxTimestampMs(), 
record.brokerId(),
@@ -82,6 +86,7 @@ public class RemoteLogSegmentMetadataTransform implements 
RemoteLogMetadataTrans
                                              segmentLeaderEpochs);
         RemoteLogSegmentMetadataUpdate rlsmUpdate
                 = new RemoteLogSegmentMetadataUpdate(remoteLogSegmentId, 
record.eventTimestampMs(),
+                                                     customMetadata,
                                                      
RemoteLogSegmentState.forId(record.remoteLogSegmentState()),
                                                      record.brokerId());
 
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataUpdateTransform.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataUpdateTransform.java
index 3db776520c6..e2d2bf8049c 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataUpdateTransform.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataUpdateTransform.java
@@ -21,9 +21,12 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataUpdateRecord;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
 import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
 
+import java.util.Optional;
+
 public class RemoteLogSegmentMetadataUpdateTransform implements 
RemoteLogMetadataTransform<RemoteLogSegmentMetadataUpdate> {
 
     public ApiMessageAndVersion 
toApiMessageAndVersion(RemoteLogSegmentMetadataUpdate segmentMetadataUpdate) {
@@ -32,6 +35,7 @@ public class RemoteLogSegmentMetadataUpdateTransform 
implements RemoteLogMetadat
                 .setBrokerId(segmentMetadataUpdate.brokerId())
                 .setEventTimestampMs(segmentMetadataUpdate.eventTimestampMs())
                 .setRemoteLogSegmentState(segmentMetadataUpdate.state().id());
+        segmentMetadataUpdate.customMetadata().ifPresent(md -> 
record.setCustomMetadata(md.value()));
 
         return new ApiMessageAndVersion(record, 
record.highestSupportedVersion());
     }
@@ -42,8 +46,9 @@ public class RemoteLogSegmentMetadataUpdateTransform 
implements RemoteLogMetadat
         TopicIdPartition topicIdPartition = new 
TopicIdPartition(entry.topicIdPartition().id(),
                 new TopicPartition(entry.topicIdPartition().name(), 
entry.topicIdPartition().partition()));
 
+        Optional<CustomMetadata> customMetadata = 
Optional.ofNullable(record.customMetadata()).map(CustomMetadata::new);
         return new RemoteLogSegmentMetadataUpdate(new 
RemoteLogSegmentId(topicIdPartition, entry.id()),
-                record.eventTimestampMs(), 
RemoteLogSegmentState.forId(record.remoteLogSegmentState()), record.brokerId());
+                record.eventTimestampMs(), customMetadata, 
RemoteLogSegmentState.forId(record.remoteLogSegmentState()), record.brokerId());
     }
 
     private RemoteLogSegmentMetadataUpdateRecord.RemoteLogSegmentIdEntry 
createRemoteLogSegmentIdEntry(RemoteLogSegmentMetadataUpdate data) {
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/ClassLoaderAwareRemoteStorageManager.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/ClassLoaderAwareRemoteStorageManager.java
index 6284e4cb78e..0260264e344 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/ClassLoaderAwareRemoteStorageManager.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/ClassLoaderAwareRemoteStorageManager.java
@@ -18,9 +18,12 @@ package org.apache.kafka.server.log.remote.storage;
 
 import org.apache.kafka.storage.internals.log.StorageAction;
 
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * A wrapper class of {@link RemoteStorageManager} that sets the context class 
loader when calling the respective
@@ -66,12 +69,9 @@ public class ClassLoaderAwareRemoteStorageManager implements 
RemoteStorageManage
         }
     }
 
-    public void copyLogSegmentData(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata,
+    public Optional<CustomMetadata> 
copyLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
                                    LogSegmentData logSegmentData) throws 
RemoteStorageException {
-        withClassLoader(() -> {
-            delegate.copyLogSegmentData(remoteLogSegmentMetadata, 
logSegmentData);
-            return null;
-        });
+        return withClassLoader(() -> 
delegate.copyLogSegmentData(remoteLogSegmentMetadata, logSegmentData));
     }
 
     @Override
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
index 17a2746fdb3..1167ee73c66 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
@@ -81,6 +81,13 @@ public final class RemoteLogManagerConfig {
     public static final String REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_DOC = 
"Listener name of the local broker to which it should get connected if " +
             "needed by RemoteLogMetadataManager implementation.";
 
+    public static final String 
REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_PROP = 
"remote.log.metadata.custom.metadata.max.bytes";
+    public static final String 
REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_DOC = "The maximum size of custom 
metadata in bytes that the broker " +
+            "should accept from a remote storage plugin. If custom  metadata 
exceeds this limit, the updated segment metadata " +
+            "will not be stored, the copied data will be attempted to delete, 
" +
+            "and the remote copying task for this topic-partition will stop 
with an error.";
+    public static final int 
DEFAULT_REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES = 128;
+
     public static final String 
REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP = 
"remote.log.index.file.cache.total.size.bytes";
     public static final String 
REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_DOC = "The total size of the space 
allocated to store index files fetched " +
             "from remote storage in the local storage.";
@@ -181,6 +188,12 @@ public final class RemoteLogManagerConfig {
                                   new ConfigDef.NonEmptyString(),
                                   MEDIUM,
                                   
REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_DOC)
+                  
.defineInternal(REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_PROP,
+                                  INT,
+                                  
DEFAULT_REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES,
+                                  atLeast(0),
+                                  LOW,
+                                  
REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_DOC)
                   
.defineInternal(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP,
                                   LONG,
                                   
DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES,
@@ -260,6 +273,7 @@ public final class RemoteLogManagerConfig {
     private final String remoteLogMetadataManagerPrefix;
     private final HashMap<String, Object> remoteLogMetadataManagerProps;
     private final String remoteLogMetadataManagerListenerName;
+    private final int remoteLogMetadataCustomMetadataMaxBytes;
 
     public RemoteLogManagerConfig(AbstractConfig config) {
         this(config.getBoolean(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP),
@@ -276,6 +290,7 @@ public final class RemoteLogManagerConfig {
              config.getDouble(REMOTE_LOG_MANAGER_TASK_RETRY_JITTER_PROP),
              config.getInt(REMOTE_LOG_READER_THREADS_PROP),
              config.getInt(REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP),
+             config.getInt(REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_PROP),
              config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP),
              config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP) != 
null
                  ? 
config.originalsWithPrefix(config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP))
@@ -301,6 +316,7 @@ public final class RemoteLogManagerConfig {
                                   double remoteLogManagerTaskRetryJitter,
                                   int remoteLogReaderThreads,
                                   int remoteLogReaderMaxPendingTasks,
+                                  int remoteLogMetadataCustomMetadataMaxBytes,
                                   String remoteStorageManagerPrefix,
                                   Map<String, Object> 
remoteStorageManagerProps, /* properties having keys stripped out with 
remoteStorageManagerPrefix */
                                   String remoteLogMetadataManagerPrefix,
@@ -324,6 +340,7 @@ public final class RemoteLogManagerConfig {
         this.remoteLogMetadataManagerPrefix = remoteLogMetadataManagerPrefix;
         this.remoteLogMetadataManagerProps = new 
HashMap<>(remoteLogMetadataManagerProps);
         this.remoteLogMetadataManagerListenerName = 
remoteLogMetadataManagerListenerName;
+        this.remoteLogMetadataCustomMetadataMaxBytes = 
remoteLogMetadataCustomMetadataMaxBytes;
     }
 
     public boolean enableRemoteStorageSystem() {
@@ -382,6 +399,10 @@ public final class RemoteLogManagerConfig {
         return remoteLogMetadataManagerListenerName;
     }
 
+    public int remoteLogMetadataCustomMetadataMaxBytes() {
+        return remoteLogMetadataCustomMetadataMaxBytes;
+    }
+
     public String remoteStorageManagerPrefix() {
         return remoteStorageManagerPrefix;
     }
@@ -412,6 +433,7 @@ public final class RemoteLogManagerConfig {
                 && remoteLogManagerTaskRetryJitter == 
that.remoteLogManagerTaskRetryJitter
                 && remoteLogReaderThreads == that.remoteLogReaderThreads
                 && remoteLogReaderMaxPendingTasks == 
that.remoteLogReaderMaxPendingTasks
+                && remoteLogMetadataCustomMetadataMaxBytes == 
that.remoteLogMetadataCustomMetadataMaxBytes
                 && Objects.equals(remoteStorageManagerClassName, 
that.remoteStorageManagerClassName)
                 && Objects.equals(remoteStorageManagerClassPath, 
that.remoteStorageManagerClassPath)
                 && Objects.equals(remoteLogMetadataManagerClassName, 
that.remoteLogMetadataManagerClassName)
@@ -427,7 +449,7 @@ public final class RemoteLogManagerConfig {
     public int hashCode() {
         return Objects.hash(enableRemoteStorageSystem, 
remoteStorageManagerClassName, remoteStorageManagerClassPath,
                             remoteLogMetadataManagerClassName, 
remoteLogMetadataManagerClassPath, remoteLogMetadataManagerListenerName,
-                            remoteLogIndexFileCacheTotalSizeBytes, 
remoteLogManagerThreadPoolSize, remoteLogManagerTaskIntervalMs,
+                            remoteLogMetadataCustomMetadataMaxBytes, 
remoteLogIndexFileCacheTotalSizeBytes, remoteLogManagerThreadPoolSize, 
remoteLogManagerTaskIntervalMs,
                             remoteLogManagerTaskRetryBackoffMs, 
remoteLogManagerTaskRetryBackoffMaxMs, remoteLogManagerTaskRetryJitter,
                             remoteLogReaderThreads, 
remoteLogReaderMaxPendingTasks, remoteStorageManagerProps, 
remoteLogMetadataManagerProps,
                             remoteStorageManagerPrefix, 
remoteLogMetadataManagerPrefix);
diff --git 
a/storage/src/main/resources/message/RemoteLogSegmentMetadataRecord.json 
b/storage/src/main/resources/message/RemoteLogSegmentMetadataRecord.json
index d18144e4dfe..c737135a6a2 100644
--- a/storage/src/main/resources/message/RemoteLogSegmentMetadataRecord.json
+++ b/storage/src/main/resources/message/RemoteLogSegmentMetadataRecord.json
@@ -116,6 +116,14 @@
       "versions": "0+",
       "about": "Segment size in bytes."
     },
+    {
+      "name": "CustomMetadata",
+      "type": "bytes",
+      "default": "null",
+      "versions": "0+",
+      "nullableVersions": "0+",
+      "about": "Custom metadata."
+    },
     {
       "name": "RemoteLogSegmentState",
       "type": "int8",
diff --git 
a/storage/src/main/resources/message/RemoteLogSegmentMetadataSnapshotRecord.json
 
b/storage/src/main/resources/message/RemoteLogSegmentMetadataSnapshotRecord.json
index dbb29139c19..20fb1732572 100644
--- 
a/storage/src/main/resources/message/RemoteLogSegmentMetadataSnapshotRecord.json
+++ 
b/storage/src/main/resources/message/RemoteLogSegmentMetadataSnapshotRecord.json
@@ -82,6 +82,14 @@
       "versions": "0+",
       "about": "Segment size in bytes"
     },
+    {
+      "name": "CustomMetadata",
+      "type": "bytes",
+      "default": "null",
+      "versions": "0+",
+      "nullableVersions": "0+",
+      "about": "Custom metadata."
+    },
     {
       "name": "RemoteLogSegmentState",
       "type": "int8",
diff --git 
a/storage/src/main/resources/message/RemoteLogSegmentMetadataUpdateRecord.json 
b/storage/src/main/resources/message/RemoteLogSegmentMetadataUpdateRecord.json
index 24003dcbce8..48aa34d4e91 100644
--- 
a/storage/src/main/resources/message/RemoteLogSegmentMetadataUpdateRecord.json
+++ 
b/storage/src/main/resources/message/RemoteLogSegmentMetadataUpdateRecord.json
@@ -72,6 +72,14 @@
       "versions": "0+",
       "about": "Epoch time in milli seconds at which this event is generated."
     },
+    {
+      "name": "CustomMetadata",
+      "type": "bytes",
+      "default": "null",
+      "versions": "0+",
+      "nullableVersions": "0+",
+      "about": "Custom metadata."
+    },
     {
       "name": "RemoteLogSegmentState",
       "type": "int8",
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCacheTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCacheTest.java
index 5f77417a458..d5341e07b07 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCacheTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCacheTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
 import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
 import org.apache.kafka.test.TestUtils;
@@ -50,8 +51,10 @@ public class FileBasedRemoteLogMetadataCacheTest {
                                                                           0, 
100, System.currentTimeMillis(), brokerId, System.currentTimeMillis(),
                                                                           1024 
* 1024, Collections.singletonMap(0, 0L));
         cache.addCopyInProgressSegment(metadata1);
-        RemoteLogSegmentMetadataUpdate metadataUpdate1 = new 
RemoteLogSegmentMetadataUpdate(segmentId1, System.currentTimeMillis(),
-                                                                               
            RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
+        RemoteLogSegmentMetadataUpdate metadataUpdate1 = new 
RemoteLogSegmentMetadataUpdate(
+                segmentId1, System.currentTimeMillis(),
+                Optional.of(new CustomMetadata(new byte[]{0, 1, 2, 3})),
+                RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
         cache.updateRemoteLogSegmentMetadata(metadataUpdate1);
         Optional<RemoteLogSegmentMetadata> receivedMetadata = 
cache.remoteLogSegmentMetadata(0, 0L);
         assertTrue(receivedMetadata.isPresent());
@@ -63,8 +66,10 @@ public class FileBasedRemoteLogMetadataCacheTest {
                                                                           0, 
900, System.currentTimeMillis(), brokerId, System.currentTimeMillis(),
                                                                           1024 
* 1024, Collections.singletonMap(0, 0L));
         cache.addCopyInProgressSegment(metadata2);
-        RemoteLogSegmentMetadataUpdate metadataUpdate2 = new 
RemoteLogSegmentMetadataUpdate(segmentId2, System.currentTimeMillis(),
-                                                                               
            RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
+        RemoteLogSegmentMetadataUpdate metadataUpdate2 = new 
RemoteLogSegmentMetadataUpdate(
+                segmentId2, System.currentTimeMillis(),
+                Optional.of(new CustomMetadata(new byte[]{4, 5, 6, 7})),
+                RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
         cache.updateRemoteLogSegmentMetadata(metadataUpdate2);
 
         // Fetch segment for leader epoch:0 and start offset:0, it should be 
the newly added segment.
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCacheTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCacheTest.java
index 789997feed2..6fe08462803 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCacheTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCacheTest.java
@@ -31,6 +31,7 @@ import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
 import java.util.Map;
+import java.util.Optional;
 
 public class RemoteLogMetadataCacheTest {
 
@@ -57,7 +58,7 @@ public class RemoteLogMetadataCacheTest {
                         -1L, BROKER_ID_0, time.milliseconds(), SEG_SIZE, 
Collections.singletonMap(0, 0L));
                 RemoteLogSegmentMetadata updatedMetadata = segmentMetadata
                         .createWithUpdates(new 
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(),
-                                time.milliseconds(), state, BROKER_ID_1));
+                                time.milliseconds(), Optional.empty(), state, 
BROKER_ID_1));
                 Assertions.assertThrows(IllegalArgumentException.class, () ->
                         cache.addCopyInProgressSegment(updatedMetadata));
             }
@@ -67,7 +68,9 @@ public class RemoteLogMetadataCacheTest {
         Assertions.assertThrows(RemoteResourceNotFoundException.class, () -> {
             RemoteLogSegmentId nonExistingId = new RemoteLogSegmentId(TP0, 
Uuid.randomUuid());
             cache.updateRemoteLogSegmentMetadata(new 
RemoteLogSegmentMetadataUpdate(nonExistingId,
-                    time.milliseconds(), 
RemoteLogSegmentState.DELETE_SEGMENT_STARTED, BROKER_ID_1));
+                    time.milliseconds(),
+                    Optional.empty(),
+                    RemoteLogSegmentState.DELETE_SEGMENT_STARTED, 
BROKER_ID_1));
         });
 
         // Check for invalid state transition.
@@ -75,7 +78,9 @@ public class RemoteLogMetadataCacheTest {
             RemoteLogSegmentMetadata segmentMetadata = 
createSegmentUpdateWithState(cache, Collections.singletonMap(0, 0L), 0,
                     100, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
             cache.updateRemoteLogSegmentMetadata(new 
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(),
-                    time.milliseconds(), 
RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, BROKER_ID_1));
+                    time.milliseconds(),
+                    Optional.empty(),
+                    RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, 
BROKER_ID_1));
         });
     }
 
@@ -90,8 +95,11 @@ public class RemoteLogMetadataCacheTest {
                                                                                
 BROKER_ID_0, time.milliseconds(), SEG_SIZE, segmentLeaderEpochs);
         cache.addCopyInProgressSegment(segmentMetadata);
 
-        RemoteLogSegmentMetadataUpdate segMetadataUpdate = new 
RemoteLogSegmentMetadataUpdate(segmentId,
-                                                                               
               time.milliseconds(), state, BROKER_ID_1);
+        RemoteLogSegmentMetadataUpdate segMetadataUpdate = new 
RemoteLogSegmentMetadataUpdate(
+                segmentId,
+                time.milliseconds(),
+                Optional.empty(),
+                state, BROKER_ID_1);
         cache.updateRemoteLogSegmentMetadata(segMetadataUpdate);
 
         return segmentMetadata.createWithUpdates(segMetadataUpdate);
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java
index 3f9db8cee13..e3d1a2aee0c 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java
@@ -23,6 +23,8 @@ import org.apache.kafka.common.Uuid;
 import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
 import org.junit.jupiter.api.Test;
 
 import java.io.ByteArrayOutputStream;
@@ -30,6 +32,7 @@ import java.io.IOException;
 import java.io.PrintStream;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
@@ -46,15 +49,22 @@ public class RemoteLogMetadataFormatterTest {
         segLeaderEpochs.put(1, 20L);
         segLeaderEpochs.put(2, 80L);
         RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, 
SEGMENT_ID);
+        Optional<CustomMetadata> customMetadata = Optional.of(new 
CustomMetadata(new byte[10]));
         RemoteLogSegmentMetadata remoteLogMetadata = new 
RemoteLogSegmentMetadata(
                 remoteLogSegmentId, 0L, 100L, -1L, 1,
-                123L, 1024, segLeaderEpochs);
+                123L, 1024, customMetadata,
+                RemoteLogSegmentState.COPY_SEGMENT_STARTED, segLeaderEpochs);
 
         byte[] metadataBytes = new 
RemoteLogMetadataSerde().serialize(remoteLogMetadata);
         ConsumerRecord<byte[], byte[]> metadataRecord = new 
ConsumerRecord<>("__remote_log_metadata", 0, 0, null, metadataBytes);
 
         String expected = String.format(
-                "partition: 0, offset: 0, value: 
RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=%s:foo-0,
 id=%s}, startOffset=0, endOffset=100, brokerId=1, maxTimestampMs=-1, 
eventTimestampMs=123, segmentLeaderEpochs={0=0, 1=20, 2=80}, 
segmentSizeInBytes=1024, state=COPY_SEGMENT_STARTED}\n",
+                "partition: 0, offset: 0, value: " +
+                        
"RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=%s:foo-0,
 id=%s}, " +
+                        "startOffset=0, endOffset=100, brokerId=1, 
maxTimestampMs=-1, " +
+                        "eventTimestampMs=123, segmentLeaderEpochs={0=0, 1=20, 
2=80}, segmentSizeInBytes=1024, " +
+                        "customMetadata=Optional[CustomMetadata{10 bytes}], " +
+                        "state=COPY_SEGMENT_STARTED}\n",
                 TOPIC_ID, SEGMENT_ID);
         try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
              PrintStream ps = new PrintStream(baos)) {
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java
index 402d1a2994a..5b48790c7fd 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.utils.Time;
 import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
 import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
 import 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
@@ -34,6 +35,7 @@ import org.junit.jupiter.api.Test;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 
 public class RemoteLogMetadataSerdeTest {
 
@@ -69,12 +71,17 @@ public class RemoteLogMetadataSerdeTest {
         segLeaderEpochs.put(2, 80L);
         RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, 
Uuid.randomUuid());
         return new RemoteLogSegmentMetadata(remoteLogSegmentId, 0L, 100L, -1L, 
1,
-                                            time.milliseconds(), 1024, 
segLeaderEpochs);
+                                            time.milliseconds(), 1024,
+                                            Optional.of(new CustomMetadata(new 
byte[] {0, 1, 2, 3})),
+                                            
RemoteLogSegmentState.COPY_SEGMENT_STARTED,
+                                            segLeaderEpochs
+        );
     }
 
     private RemoteLogSegmentMetadataUpdate 
createRemoteLogSegmentMetadataUpdate() {
         RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, 
Uuid.randomUuid());
         return new RemoteLogSegmentMetadataUpdate(remoteLogSegmentId, 
time.milliseconds(),
+                                                  Optional.of(new 
CustomMetadata(new byte[] {0, 1, 2, 3})),
                                                   
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, 2);
     }
 
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFileTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFileTest.java
index 1b4602887be..dbfbbf3b044 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFileTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFileTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.server.log.remote.metadata.storage;
 
 import org.apache.kafka.common.Uuid;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
 import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.Assertions;
@@ -61,10 +62,13 @@ public class RemoteLogMetadataSnapshotFileTest {
         long startOffset = 0;
         for (int i = 0; i < 100; i++) {
             long endOffset = startOffset + 100L;
+            CustomMetadata customMetadata = new CustomMetadata(new 
byte[]{(byte) i});
             remoteLogSegmentMetadatas.add(
                     new RemoteLogSegmentMetadataSnapshot(Uuid.randomUuid(), 
startOffset, endOffset,
                                                          
System.currentTimeMillis(), 1, 100, 1024,
-                                                         
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, Collections.singletonMap(i, 
startOffset)));
+                                                         
Optional.of(customMetadata),
+                                                         
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, Collections.singletonMap(i, 
startOffset)
+                    ));
             startOffset = endOffset + 1;
         }
 
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java
index 87e76833293..504f47e17a5 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java
@@ -27,6 +27,7 @@ import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteL
 import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemotePartitionDeleteMetadataTransform;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
 import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
 import 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
@@ -35,6 +36,7 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
+import java.util.Optional;
 
 public class RemoteLogMetadataTransformTest {
     private static final TopicIdPartition TP0 = new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
@@ -58,6 +60,7 @@ public class RemoteLogMetadataTransformTest {
 
         RemoteLogSegmentMetadataUpdate metadataUpdate =
                 new RemoteLogSegmentMetadataUpdate(new RemoteLogSegmentId(TP0, 
Uuid.randomUuid()), time.milliseconds(),
+                                                   Optional.of(new 
CustomMetadata(new byte[]{0, 1, 2, 3})),
                                                    
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, 1);
         ApiMessageAndVersion apiMessageAndVersion = 
metadataUpdateTransform.toApiMessageAndVersion(metadataUpdate);
         RemoteLogSegmentMetadataUpdate metadataUpdateFromRecord = 
metadataUpdateTransform.fromApiMessageAndVersion(apiMessageAndVersion);
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java
index b847e7cba3f..6928c6e281f 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java
@@ -92,7 +92,8 @@ public class RemoteLogSegmentLifecycleTest {
             });
 
             RemoteLogSegmentMetadataUpdate segment0Update = new 
RemoteLogSegmentMetadataUpdate(
-                    segment0Id, time.milliseconds(), 
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, BROKER_ID_1);
+                    segment0Id, time.milliseconds(), Optional.empty(),
+                    RemoteLogSegmentState.COPY_SEGMENT_FINISHED, BROKER_ID_1);
             
remoteLogSegmentLifecycleManager.updateRemoteLogSegmentMetadata(segment0Update);
             RemoteLogSegmentMetadata expectedSegment0Metadata = 
segment0Metadata.createWithUpdates(segment0Update);
 
@@ -167,6 +168,7 @@ public class RemoteLogSegmentLifecycleTest {
             remoteLogSegmentLifecycleManager
                     .updateRemoteLogSegmentMetadata(new 
RemoteLogSegmentMetadataUpdate(expectedSegment0Metadata.remoteLogSegmentId(),
                                                                                
        time.milliseconds(),
+                                                                               
        Optional.empty(),
                                                                                
        RemoteLogSegmentState.DELETE_SEGMENT_STARTED,
                                                                                
        BROKER_ID_1));
             
Assertions.assertFalse(remoteLogSegmentLifecycleManager.remoteLogSegmentMetadata(0,
 10).isPresent());
@@ -176,6 +178,7 @@ public class RemoteLogSegmentLifecycleTest {
             remoteLogSegmentLifecycleManager
                     .updateRemoteLogSegmentMetadata(new 
RemoteLogSegmentMetadataUpdate(expectedSegment0Metadata.remoteLogSegmentId(),
                                                                                
        time.milliseconds(),
+                                                                               
        Optional.empty(),
                                                                                
        RemoteLogSegmentState.DELETE_SEGMENT_FINISHED,
                                                                                
        BROKER_ID_1));
             
Assertions.assertFalse(remoteLogSegmentLifecycleManager.remoteLogSegmentMetadata(0,
 10).isPresent());
@@ -218,7 +221,9 @@ public class RemoteLogSegmentLifecycleTest {
                                                                                
 time.milliseconds(), SEG_SIZE, segmentLeaderEpochs);
         
remoteLogSegmentLifecycleManager.addRemoteLogSegmentMetadata(segmentMetadata);
 
-        RemoteLogSegmentMetadataUpdate segMetadataUpdate = new 
RemoteLogSegmentMetadataUpdate(segmentId, time.milliseconds(), state, 
BROKER_ID_1);
+        RemoteLogSegmentMetadataUpdate segMetadataUpdate = new 
RemoteLogSegmentMetadataUpdate(segmentId, time.milliseconds(),
+                Optional.empty(),
+                state, BROKER_ID_1);
         
remoteLogSegmentLifecycleManager.updateRemoteLogSegmentMetadata(segMetadataUpdate);
 
         return segmentMetadata.createWithUpdates(segMetadataUpdate);
@@ -367,6 +372,7 @@ public class RemoteLogSegmentLifecycleTest {
 
             RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new 
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(),
                                                                                
                       time.milliseconds(),
+                                                                               
                       Optional.empty(),
                                                                                
                       RemoteLogSegmentState.DELETE_SEGMENT_FINISHED,
                                                                                
                       BROKER_ID_1);
             
remoteLogSegmentLifecycleManager.updateRemoteLogSegmentMetadata(segmentMetadataUpdate);
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransformTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransformTest.java
new file mode 100644
index 00000000000..17d424249ff
--- /dev/null
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransformTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage.serialization;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import 
org.apache.kafka.server.log.remote.metadata.storage.RemoteLogSegmentMetadataSnapshot;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class RemoteLogSegmentMetadataSnapshotTransformTest {
+    @ParameterizedTest
+    @MethodSource("parameters")
+    void testToAndFromMessage(Optional<CustomMetadata> customMetadata) {
+        Map<Integer, Long> segmentLeaderEpochs = new HashMap<>();
+        segmentLeaderEpochs.put(0, 0L);
+        RemoteLogSegmentMetadataSnapshot snapshot = new 
RemoteLogSegmentMetadataSnapshot(
+                Uuid.randomUuid(),
+                0L, 100L, -1L, 0, 0, 1234,
+                customMetadata,
+                RemoteLogSegmentState.COPY_SEGMENT_FINISHED,
+                segmentLeaderEpochs
+        );
+
+        RemoteLogSegmentMetadataSnapshotTransform transform = new 
RemoteLogSegmentMetadataSnapshotTransform();
+        ApiMessageAndVersion message = 
transform.toApiMessageAndVersion(snapshot);
+        assertEquals(snapshot, transform.fromApiMessageAndVersion(message));
+    }
+
+    private static Stream<Object> parameters() {
+        return Stream.of(
+                Optional.of(new CustomMetadata(new byte[]{0, 1, 2, 3})),
+                Optional.of(new CustomMetadata(new byte[0])),
+                Optional.empty()
+        );
+    }
+}
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransformTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransformTest.java
new file mode 100644
index 00000000000..84324e7435b
--- /dev/null
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransformTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage.serialization;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class RemoteLogSegmentMetadataTransformTest {
+    @ParameterizedTest
+    @MethodSource("parameters")
+    void testToAndFromMessage(Optional<CustomMetadata> customMetadata) {
+        Map<Integer, Long> segmentLeaderEpochs = new HashMap<>();
+        segmentLeaderEpochs.put(0, 0L);
+        RemoteLogSegmentMetadata metadata = new RemoteLogSegmentMetadata(
+                new RemoteLogSegmentId(new TopicIdPartition(Uuid.randomUuid(), 
0, "topic"), Uuid.randomUuid()),
+                0L, 100L, -1L, 0, 0, 1234,
+                customMetadata,
+                RemoteLogSegmentState.COPY_SEGMENT_FINISHED,
+                segmentLeaderEpochs
+        );
+
+        RemoteLogSegmentMetadataTransform transform = new 
RemoteLogSegmentMetadataTransform();
+        ApiMessageAndVersion message = 
transform.toApiMessageAndVersion(metadata);
+        assertEquals(metadata, transform.fromApiMessageAndVersion(message));
+    }
+
+    private static Stream<Object> parameters() {
+        return Stream.of(
+                Optional.of(new CustomMetadata(new byte[]{0, 1, 2, 3})),
+                Optional.of(new CustomMetadata(new byte[0])),
+                Optional.empty()
+        );
+    }
+}
\ No newline at end of file
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataUpdateTransformTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataUpdateTransformTest.java
new file mode 100644
index 00000000000..09f1b7620a0
--- /dev/null
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataUpdateTransformTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage.serialization;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class RemoteLogSegmentMetadataUpdateTransformTest {
+    @ParameterizedTest
+    @MethodSource("parameters")
+    void testToAndFromMessage(Optional<CustomMetadata> customMetadata) {
+        Map<Integer, Long> segmentLeaderEpochs = new HashMap<>();
+        segmentLeaderEpochs.put(0, 0L);
+        RemoteLogSegmentMetadataUpdate metadataUpdate = new 
RemoteLogSegmentMetadataUpdate(
+                new RemoteLogSegmentId(new TopicIdPartition(Uuid.randomUuid(), 
0, "topic"), Uuid.randomUuid()),
+                123L,
+                customMetadata,
+                RemoteLogSegmentState.COPY_SEGMENT_FINISHED,
+                1
+        );
+
+        RemoteLogSegmentMetadataUpdateTransform transform = new 
RemoteLogSegmentMetadataUpdateTransform();
+        ApiMessageAndVersion message = 
transform.toApiMessageAndVersion(metadataUpdate);
+        assertEquals(metadataUpdate, 
transform.fromApiMessageAndVersion(message));
+    }
+
+    private static Stream<Object> parameters() {
+        return Stream.of(
+                Optional.of(new CustomMetadata(new byte[]{0, 1, 2, 3})),
+                Optional.of(new CustomMetadata(new byte[0])),
+                Optional.empty()
+        );
+    }
+}
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteStorageManager.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteStorageManager.java
index c9541f66fec..8650cea5d47 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteStorageManager.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteStorageManager.java
@@ -26,8 +26,11 @@ import java.nio.file.Files;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
+
 /**
  * This class is an implementation of {@link RemoteStorageManager} backed by 
in-memory store.
  */
@@ -52,8 +55,8 @@ public class InmemoryRemoteStorageManager implements 
RemoteStorageManager {
     }
 
     @Override
-    public void copyLogSegmentData(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata,
-                                   LogSegmentData logSegmentData)
+    public Optional<CustomMetadata> 
copyLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
+                                                       LogSegmentData 
logSegmentData)
             throws RemoteStorageException {
         log.debug("copying log segment and indexes for : {}", 
remoteLogSegmentMetadata);
         Objects.requireNonNull(remoteLogSegmentMetadata, 
"remoteLogSegmentMetadata can not be null");
@@ -83,6 +86,7 @@ public class InmemoryRemoteStorageManager implements 
RemoteStorageManager {
             throw new RemoteStorageException(e);
         }
         log.debug("copied log segment and indexes for : {} successfully.", 
remoteLogSegmentMetadata);
+        return Optional.empty();
     }
 
     @Override
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java
index 058d8a5f649..a8847e5bba9 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.InvalidConfigurationException;
 import 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageListener.LocalTieredStorageListeners;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.test.TestUtils;
 import org.slf4j.Logger;
@@ -302,9 +303,9 @@ public final class LocalTieredStorage implements 
RemoteStorageManager {
     }
 
     @Override
-    public void copyLogSegmentData(final RemoteLogSegmentMetadata metadata, 
final LogSegmentData data)
+    public Optional<CustomMetadata> copyLogSegmentData(final 
RemoteLogSegmentMetadata metadata, final LogSegmentData data)
             throws RemoteStorageException {
-        Callable<Void> callable = () -> {
+        Callable<Optional<CustomMetadata>> callable = () -> {
             final RemoteLogSegmentId id = metadata.remoteLogSegmentId();
             final LocalTieredStorageEvent.Builder eventBuilder = 
newEventBuilder(COPY_SEGMENT, id);
             RemoteLogSegmentFileset fileset = null;
@@ -331,10 +332,10 @@ public final class LocalTieredStorage implements 
RemoteStorageManager {
                 throw e;
             }
 
-            return null;
+            return Optional.empty();
         };
 
-        wrap(callable);
+        return wrap(callable);
     }
 
     @Override
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java
index bb3c2ff73c4..c8b428ca547 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java
@@ -41,7 +41,7 @@ public class RemoteLogManagerConfigTest {
         RemoteLogManagerConfig expectedRemoteLogManagerConfig
                 = new RemoteLogManagerConfig(true, 
"dummy.remote.storage.class", "dummy.remote.storage.class.path",
                                              
"dummy.remote.log.metadata.class", "dummy.remote.log.metadata.class.path",
-                                             "listener.name", 1024 * 1024L, 1, 
60000L, 100L, 60000L, 0.3, 10, 100,
+                                             "listener.name", 1024 * 1024L, 1, 
60000L, 100L, 60000L, 0.3, 10, 100, 100,
                                              rsmPrefix, rsmProps, rlmmPrefix, 
rlmmProps);
 
         Map<String, Object> props = 
extractProps(expectedRemoteLogManagerConfig);
@@ -82,6 +82,8 @@ public class RemoteLogManagerConfigTest {
                   remoteLogManagerConfig.remoteLogReaderThreads());
         
props.put(RemoteLogManagerConfig.REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP,
                   remoteLogManagerConfig.remoteLogReaderMaxPendingTasks());
+        
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_PROP,
+                  
remoteLogManagerConfig.remoteLogMetadataCustomMetadataMaxBytes());
         
props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP,
                   remoteLogManagerConfig.remoteStorageManagerPrefix());
         
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP,
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java
index 95521c42dcf..528843a90f6 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java
@@ -69,7 +69,8 @@ public class RemoteLogMetadataManagerTest {
 
             // 2.Move that segment to COPY_SEGMENT_FINISHED state and this 
segment should be available.
             RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new 
RemoteLogSegmentMetadataUpdate(segmentId, time.milliseconds(),
-                                                                               
                       RemoteLogSegmentState.COPY_SEGMENT_FINISHED,
+                    Optional.empty(),
+                    RemoteLogSegmentState.COPY_SEGMENT_FINISHED,
                                                                                
                       BROKER_ID_1);
             // Wait until the segment is updated successfully.
             
remoteLogMetadataManager.updateRemoteLogSegmentMetadata(segmentMetadataUpdate).get();
@@ -108,7 +109,7 @@ public class RemoteLogMetadataManagerTest {
             
remoteLogMetadataManager.addRemoteLogSegmentMetadata(segmentMetadata).get();
 
             RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new 
RemoteLogSegmentMetadataUpdate(
-                    segmentId, time.milliseconds(), 
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, BROKER_ID_1);
+                    segmentId, time.milliseconds(), Optional.empty(), 
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, BROKER_ID_1);
             // Wait until the segment is updated successfully.
             
remoteLogMetadataManager.updateRemoteLogSegmentMetadata(segmentMetadataUpdate).get();
 

Reply via email to