This is an automated email from the ASF dual-hosted git repository.
satishd pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b3db905b27f KAFKA-15107: Support custom metadata for remote log
segment (#13984)
b3db905b27f is described below
commit b3db905b27ff4133f4018ac922c9ce2beb2d6087
Author: Ivan Yurchenko <[email protected]>
AuthorDate: Fri Aug 4 15:53:25 2023 +0300
KAFKA-15107: Support custom metadata for remote log segment (#13984)
* KAFKA-15107: Support custom metadata for remote log segment
This commit does the changes discussed in the KIP-917. Mainly, changes the
`RemoteStorageManager` interface in order to return `CustomMetadata` and then
ensures these custom metadata are stored, propagated, (de-)serialized correctly
along with the standard metadata throughout the whole lifecycle. It introduces
the `remote.log.metadata.custom.metadata.max.size` to limit the custom metadata
size acceptable by the broker and stop uploading in case a piece of metadata
exceeds this limit.
On testing:
1. `RemoteLogManagerTest` checks the case when a piece of custom metadata
is larger than the configured limit.
2. `RemoteLogSegmentMetadataTest` checks if `createWithUpdates` works
correctly, including custom metadata.
3. `RemoteLogSegmentMetadataTransformTest`,
`RemoteLogSegmentMetadataSnapshotTransformTest`, and
`RemoteLogSegmentMetadataUpdateTransformTest` were added to test the
corresponding class (de-)serialization, including custom metadata.
4. `FileBasedRemoteLogMetadataCacheTest` checks if custom metadata are
being correctly saved and loaded to a file (indirectly, via `equals`).
5. `RemoteLogManagerConfigTest` checks if the configuration setting is
handled correctly.
Reviewers: Luke Chen <[email protected]>, Satish Duggana
<[email protected]>, Divij Vaidya <[email protected]>
---
checkstyle/suppressions.xml | 1 +
.../CustomMetadataSizeLimitExceededException.java | 20 ++++
.../java/kafka/log/remote/RemoteLogManager.java | 39 +++++++-
.../kafka/log/remote/RemoteLogManagerTest.java | 111 +++++++++++++++++++--
.../remote/storage/RemoteLogSegmentMetadata.java | 75 +++++++++++++-
.../storage/RemoteLogSegmentMetadataUpdate.java | 26 ++++-
.../log/remote/storage/RemoteStorageManager.java | 7 +-
.../remote/storage/NoOpRemoteStorageManager.java | 8 +-
.../storage/RemoteLogSegmentMetadataTest.java | 77 ++++++++++++++
.../storage/FileBasedRemoteLogMetadataCache.java | 3 +-
.../storage/RemoteLogSegmentMetadataSnapshot.java | 35 ++++++-
.../RemoteLogSegmentMetadataSnapshotTransform.java | 5 +
.../RemoteLogSegmentMetadataTransform.java | 5 +
.../RemoteLogSegmentMetadataUpdateTransform.java | 7 +-
.../ClassLoaderAwareRemoteStorageManager.java | 10 +-
.../log/remote/storage/RemoteLogManagerConfig.java | 24 ++++-
.../message/RemoteLogSegmentMetadataRecord.json | 8 ++
.../RemoteLogSegmentMetadataSnapshotRecord.json | 8 ++
.../RemoteLogSegmentMetadataUpdateRecord.json | 8 ++
.../FileBasedRemoteLogMetadataCacheTest.java | 13 ++-
.../storage/RemoteLogMetadataCacheTest.java | 18 +++-
.../storage/RemoteLogMetadataFormatterTest.java | 14 ++-
.../storage/RemoteLogMetadataSerdeTest.java | 9 +-
.../storage/RemoteLogMetadataSnapshotFileTest.java | 6 +-
.../storage/RemoteLogMetadataTransformTest.java | 3 +
.../storage/RemoteLogSegmentLifecycleTest.java | 10 +-
...oteLogSegmentMetadataSnapshotTransformTest.java | 60 +++++++++++
.../RemoteLogSegmentMetadataTransformTest.java | 62 ++++++++++++
...emoteLogSegmentMetadataUpdateTransformTest.java | 62 ++++++++++++
.../storage/InmemoryRemoteStorageManager.java | 8 +-
.../log/remote/storage/LocalTieredStorage.java | 9 +-
.../remote/storage/RemoteLogManagerConfigTest.java | 4 +-
.../storage/RemoteLogMetadataManagerTest.java | 5 +-
33 files changed, 700 insertions(+), 60 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 6982920e155..e01e7094a16 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -40,6 +40,7 @@
files="core[\\/]src[\\/](generated|generated-test)[\\/].+.java$"/>
<suppress checks="NPathComplexity"
files="(ClusterTestExtensions|KafkaApisBuilder).java"/>
<suppress
checks="NPathComplexity|ClassFanOutComplexity|ClassDataAbstractionCoupling"
files="(RemoteLogManager|RemoteLogManagerTest).java"/>
+ <suppress checks="ClassFanOutComplexity"
files="RemoteLogManagerTest.java"/>
<suppress checks="MethodLength"
files="(KafkaClusterTestKit).java"/>
diff --git
a/core/src/main/java/kafka/log/remote/CustomMetadataSizeLimitExceededException.java
b/core/src/main/java/kafka/log/remote/CustomMetadataSizeLimitExceededException.java
new file mode 100644
index 00000000000..c893f3488de
--- /dev/null
+++
b/core/src/main/java/kafka/log/remote/CustomMetadataSizeLimitExceededException.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote;
+
+class CustomMetadataSizeLimitExceededException extends Exception {
+}
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index 6b723aa38fa..33bde33882f 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -47,6 +47,7 @@ import
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
import
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
@@ -480,12 +481,14 @@ public class RemoteLogManager implements Closeable {
class RLMTask extends CancellableRunnable {
private final TopicIdPartition topicIdPartition;
+ private final int customMetadataSizeLimit;
private final Logger logger;
private volatile int leaderEpoch = -1;
- public RLMTask(TopicIdPartition topicIdPartition) {
+ public RLMTask(TopicIdPartition topicIdPartition, int
customMetadataSizeLimit) {
this.topicIdPartition = topicIdPartition;
+ this.customMetadataSizeLimit = customMetadataSizeLimit;
LogContext logContext = new LogContext("[RemoteLogManager=" +
brokerId + " partition=" + topicIdPartition + "] ");
logger = logContext.logger(RLMTask.class);
}
@@ -586,6 +589,11 @@ public class RemoteLogManager implements Closeable {
} else {
logger.debug("Skipping copying segments, current
read-offset:{}, and LSO:{}", copiedOffset, lso);
}
+ } catch (CustomMetadataSizeLimitExceededException e) {
+ // Only stop this task. Logging is done where the exception is
thrown.
+
brokerTopicStats.topicStats(log.topicPartition().topic()).failedRemoteCopyRequestRate().mark();
+
brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().mark();
+ this.cancel();
} catch (InterruptedException ex) {
throw ex;
} catch (Exception ex) {
@@ -597,7 +605,8 @@ public class RemoteLogManager implements Closeable {
}
}
- private void copyLogSegment(UnifiedLog log, LogSegment segment, long
nextSegmentBaseOffset) throws InterruptedException, ExecutionException,
RemoteStorageException, IOException {
+ private void copyLogSegment(UnifiedLog log, LogSegment segment, long
nextSegmentBaseOffset) throws InterruptedException, ExecutionException,
RemoteStorageException, IOException,
+ CustomMetadataSizeLimitExceededException {
File logFile = segment.log().file();
String logFileName = logFile.getName();
@@ -623,10 +632,30 @@ public class RemoteLogManager implements Closeable {
producerStateSnapshotFile.toPath(), leaderEpochsIndex);
brokerTopicStats.topicStats(log.topicPartition().topic()).remoteCopyRequestRate().mark();
brokerTopicStats.allTopicsStats().remoteCopyRequestRate().mark();
- remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm,
segmentData);
+ Optional<CustomMetadata> customMetadata =
remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData);
RemoteLogSegmentMetadataUpdate copySegmentFinishedRlsm = new
RemoteLogSegmentMetadataUpdate(id, time.milliseconds(),
- RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
+ customMetadata,
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
+
+ if (customMetadata.isPresent()) {
+ long customMetadataSize = customMetadata.get().value().length;
+ if (customMetadataSize > this.customMetadataSizeLimit) {
+ CustomMetadataSizeLimitExceededException e = new
CustomMetadataSizeLimitExceededException();
+ logger.error("Custom metadata size {} exceeds configured
limit {}." +
+ " Copying will be stopped and copied
segment will be attempted to clean." +
+ " Original metadata: {}",
+ customMetadataSize, this.customMetadataSizeLimit,
copySegmentStartedRlsm, e);
+ try {
+ // For deletion, we provide back the custom metadata
by creating a new metadata object from the update.
+ // However, the update itself will not be stored in
this case.
+
remoteLogStorageManager.deleteLogSegmentData(copySegmentStartedRlsm.createWithUpdates(copySegmentFinishedRlsm));
+ logger.info("Successfully cleaned segment after custom
metadata size exceeded");
+ } catch (RemoteStorageException e1) {
+ logger.error("Error while cleaning segment after
custom metadata size exceeded, consider cleaning manually", e1);
+ }
+ throw e;
+ }
+ }
remoteLogMetadataManager.updateRemoteLogSegmentMetadata(copySegmentFinishedRlsm).get();
brokerTopicStats.topicStats(log.topicPartition().topic())
@@ -883,7 +912,7 @@ public class RemoteLogManager implements Closeable {
Consumer<RLMTask>
convertToLeaderOrFollower) {
RLMTaskWithFuture rlmTaskWithFuture =
leaderOrFollowerTasks.computeIfAbsent(topicPartition,
topicIdPartition -> {
- RLMTask task = new RLMTask(topicIdPartition);
+ RLMTask task = new RLMTask(topicIdPartition,
this.rlmConfig.remoteLogMetadataCustomMetadataMaxBytes());
// set this upfront when it is getting initialized instead
of doing it after scheduling.
convertToLeaderOrFollower.accept(task);
LOGGER.info("Created a new task: {} and getting
scheduled", task);
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index 4d5bc8dbb3a..941f4dc961e 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -44,6 +44,7 @@ import
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
import
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
@@ -106,7 +107,6 @@ import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
@@ -342,7 +342,8 @@ public class RemoteLogManagerTest {
dummyFuture.complete(null);
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);
-
doNothing().when(remoteStorageManager).copyLogSegmentData(any(RemoteLogSegmentMetadata.class),
any(LogSegmentData.class));
+
when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class),
any(LogSegmentData.class)))
+ .thenReturn(Optional.empty());
// Verify the metrics for remote writes and for failures is zero
before attempt to copy log segment
assertEquals(0,
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyRequestRate().count());
@@ -353,7 +354,7 @@ public class RemoteLogManagerTest {
assertEquals(0,
brokerTopicStats.allTopicsStats().remoteCopyBytesRate().count());
assertEquals(0,
brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
- RemoteLogManager.RLMTask task = remoteLogManager.new
RLMTask(leaderTopicIdPartition);
+ RemoteLogManager.RLMTask task = remoteLogManager.new
RLMTask(leaderTopicIdPartition, 128);
task.convertToLeader(2);
task.copyLogSegmentsToRemote(mockLog);
@@ -397,6 +398,100 @@ public class RemoteLogManagerTest {
assertEquals(0,
brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
}
+ // We are verifying that if the size of a piece of custom metadata is
bigger than the configured limit,
+ // the copy task should be cancelled and there should be an attempt to
delete the just copied segment.
+ @Test
+ void testCustomMetadataSizeExceedsLimit() throws Exception {
+ long oldSegmentStartOffset = 0L;
+ long nextSegmentStartOffset = 150L;
+ long lastStableOffset = 150L;
+ long logEndOffset = 150L;
+
+
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+ // leader epoch preparation
+ checkpoint.write(totalEpochEntries);
+ LeaderEpochFileCache cache = new
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+ when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
anyInt())).thenReturn(Optional.of(-1L));
+
+ File tempFile = TestUtils.tempFile();
+ File mockProducerSnapshotIndex = TestUtils.tempFile();
+ File tempDir = TestUtils.tempDirectory();
+ // create 2 log segments, with 0 and 150 as log start offset
+ LogSegment oldSegment = mock(LogSegment.class);
+ LogSegment activeSegment = mock(LogSegment.class);
+
+ when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+ when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+ verify(oldSegment, times(0)).readNextOffset();
+ verify(activeSegment, times(0)).readNextOffset();
+
+ FileRecords fileRecords = mock(FileRecords.class);
+ when(oldSegment.log()).thenReturn(fileRecords);
+ when(fileRecords.file()).thenReturn(tempFile);
+ when(fileRecords.sizeInBytes()).thenReturn(10);
+ when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
+
+ when(mockLog.activeSegment()).thenReturn(activeSegment);
+ when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
+ when(mockLog.logSegments(anyLong(),
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment,
activeSegment)));
+
+ ProducerStateManager mockStateManager =
mock(ProducerStateManager.class);
+ when(mockLog.producerStateManager()).thenReturn(mockStateManager);
+
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
+ when(mockLog.lastStableOffset()).thenReturn(lastStableOffset);
+ when(mockLog.logEndOffset()).thenReturn(logEndOffset);
+
+ LazyIndex idx =
LazyIndex.forOffset(UnifiedLog.offsetIndexFile(tempDir, oldSegmentStartOffset,
""), oldSegmentStartOffset, 1000);
+ LazyIndex timeIdx =
LazyIndex.forTime(UnifiedLog.timeIndexFile(tempDir, oldSegmentStartOffset, ""),
oldSegmentStartOffset, 1500);
+ File txnFile = UnifiedLog.transactionIndexFile(tempDir,
oldSegmentStartOffset, "");
+ txnFile.createNewFile();
+ TransactionIndex txnIndex = new
TransactionIndex(oldSegmentStartOffset, txnFile);
+ when(oldSegment.lazyTimeIndex()).thenReturn(timeIdx);
+ when(oldSegment.lazyOffsetIndex()).thenReturn(idx);
+ when(oldSegment.txnIndex()).thenReturn(txnIndex);
+
+ int customMetadataSizeLimit = 128;
+ CustomMetadata customMetadata = new CustomMetadata(new
byte[customMetadataSizeLimit * 2]);
+
+ CompletableFuture<Void> dummyFuture = new CompletableFuture<>();
+ dummyFuture.complete(null);
+
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
+
when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class),
any(LogSegmentData.class)))
+ .thenReturn(Optional.of(customMetadata));
+
+ RemoteLogManager.RLMTask task = remoteLogManager.new
RLMTask(leaderTopicIdPartition, customMetadataSizeLimit);
+ task.convertToLeader(2);
+ task.copyLogSegmentsToRemote(mockLog);
+
+ ArgumentCaptor<RemoteLogSegmentMetadata> remoteLogSegmentMetadataArg =
ArgumentCaptor.forClass(RemoteLogSegmentMetadata.class);
+
verify(remoteLogMetadataManager).addRemoteLogSegmentMetadata(remoteLogSegmentMetadataArg.capture());
+
+ // Check we attempt to delete the segment data providing the custom
metadata back.
+ RemoteLogSegmentMetadataUpdate expectedMetadataUpdate = new
RemoteLogSegmentMetadataUpdate(
+ remoteLogSegmentMetadataArg.getValue().remoteLogSegmentId(),
time.milliseconds(),
+ Optional.of(customMetadata),
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
+ RemoteLogSegmentMetadata expectedDeleteMetadata =
remoteLogSegmentMetadataArg.getValue().createWithUpdates(expectedMetadataUpdate);
+ verify(remoteStorageManager,
times(1)).deleteLogSegmentData(eq(expectedDeleteMetadata));
+
+ // Check the task is cancelled in the end.
+ assertTrue(task.isCancelled());
+
+ // The metadata update should not be posted.
+ verify(remoteLogMetadataManager,
never()).updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class));
+
+ // Verify the metric for remote writes are not updated.
+ assertEquals(1,
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyRequestRate().count());
+ assertEquals(0,
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyBytesRate().count());
+ // Verify we did not report any failure for remote writes
+ assertEquals(1,
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteCopyRequestRate().count());
+ // Verify aggregate metrics
+ assertEquals(1,
brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count());
+ assertEquals(0,
brokerTopicStats.allTopicsStats().remoteCopyBytesRate().count());
+ assertEquals(1,
brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
+ }
+
@Test
void testRemoteLogManagerTasksAvgIdlePercentMetrics() throws Exception {
long oldSegmentStartOffset = 0L;
@@ -532,7 +627,7 @@ public class RemoteLogManagerTest {
// Verify aggregate metrics
assertEquals(0,
brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count());
assertEquals(0,
brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
- RemoteLogManager.RLMTask task = remoteLogManager.new
RLMTask(leaderTopicIdPartition);
+ RemoteLogManager.RLMTask task = remoteLogManager.new
RLMTask(leaderTopicIdPartition, 128);
task.convertToLeader(2);
task.copyLogSegmentsToRemote(mockLog);
@@ -572,7 +667,7 @@ public class RemoteLogManagerTest {
when(mockLog.logSegments(anyLong(),
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment,
activeSegment)));
when(mockLog.lastStableOffset()).thenReturn(250L);
- RemoteLogManager.RLMTask task = remoteLogManager.new
RLMTask(leaderTopicIdPartition);
+ RemoteLogManager.RLMTask task = remoteLogManager.new
RLMTask(leaderTopicIdPartition, 128);
task.convertToFollower();
task.copyLogSegmentsToRemote(mockLog);
@@ -714,7 +809,7 @@ public class RemoteLogManagerTest {
@Test
void testRLMTaskShouldSetLeaderEpochCorrectly() {
- RemoteLogManager.RLMTask task = remoteLogManager.new
RLMTask(leaderTopicIdPartition);
+ RemoteLogManager.RLMTask task = remoteLogManager.new
RLMTask(leaderTopicIdPartition, 128);
assertFalse(task.isLeader());
task.convertToLeader(1);
assertTrue(task.isLeader());
@@ -862,7 +957,7 @@ public class RemoteLogManagerTest {
when(log.logSegments(5L, Long.MAX_VALUE))
.thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segment1,
segment2, activeSegment)));
- RemoteLogManager.RLMTask task = remoteLogManager.new
RLMTask(leaderTopicIdPartition);
+ RemoteLogManager.RLMTask task = remoteLogManager.new
RLMTask(leaderTopicIdPartition, 128);
List<RemoteLogManager.EnrichedLogSegment> expected =
Arrays.asList(
new RemoteLogManager.EnrichedLogSegment(segment1, 10L),
@@ -888,7 +983,7 @@ public class RemoteLogManagerTest {
when(log.logSegments(5L, Long.MAX_VALUE))
.thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segment1,
segment2, segment3, activeSegment)));
- RemoteLogManager.RLMTask task = remoteLogManager.new
RLMTask(leaderTopicIdPartition);
+ RemoteLogManager.RLMTask task = remoteLogManager.new
RLMTask(leaderTopicIdPartition, 128);
List<RemoteLogManager.EnrichedLogSegment> expected =
Arrays.asList(
new RemoteLogManager.EnrichedLogSegment(segment1, 10L),
diff --git
a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java
index 47ae7bf21ad..9b589322bbf 100644
---
a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java
+++
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java
@@ -19,10 +19,12 @@ package org.apache.kafka.server.log.remote.storage;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
+import java.util.Optional;
import java.util.TreeMap;
/**
@@ -66,6 +68,11 @@ public class RemoteLogSegmentMetadata extends
RemoteLogMetadata {
*/
private final int segmentSizeInBytes;
+ /**
+ * Custom metadata.
+ */
+ private final Optional<CustomMetadata> customMetadata;
+
/**
* It indicates the state in which the action is executed on this segment.
*/
@@ -84,6 +91,7 @@ public class RemoteLogSegmentMetadata extends
RemoteLogMetadata {
* @param brokerId Broker id from which this event is generated.
* @param eventTimestampMs Epoch time in milli seconds at which the
remote log segment is copied to the remote tier storage.
* @param segmentSizeInBytes Size of this segment in bytes.
+ * @param customMetadata Custom metadata.
* @param state State of the respective segment of
remoteLogSegmentId.
* @param segmentLeaderEpochs leader epochs occurred within this segment.
*/
@@ -94,6 +102,7 @@ public class RemoteLogSegmentMetadata extends
RemoteLogMetadata {
int brokerId,
long eventTimestampMs,
int segmentSizeInBytes,
+ Optional<CustomMetadata> customMetadata,
RemoteLogSegmentState state,
Map<Integer, Long> segmentLeaderEpochs) {
super(brokerId, eventTimestampMs);
@@ -112,6 +121,7 @@ public class RemoteLogSegmentMetadata extends
RemoteLogMetadata {
this.endOffset = endOffset;
this.maxTimestampMs = maxTimestampMs;
this.segmentSizeInBytes = segmentSizeInBytes;
+ this.customMetadata = Objects.requireNonNull(customMetadata,
"customMetadata can not be null");
if (segmentLeaderEpochs == null || segmentLeaderEpochs.isEmpty()) {
throw new IllegalArgumentException("segmentLeaderEpochs can not be
null or empty");
@@ -149,6 +159,7 @@ public class RemoteLogSegmentMetadata extends
RemoteLogMetadata {
maxTimestampMs,
brokerId,
eventTimestampMs, segmentSizeInBytes,
+ Optional.empty(),
RemoteLogSegmentState.COPY_SEGMENT_STARTED,
segmentLeaderEpochs);
}
@@ -196,6 +207,13 @@ public class RemoteLogSegmentMetadata extends
RemoteLogMetadata {
return segmentLeaderEpochs;
}
+ /**
+ * @return Custom metadata.
+ */
+ public Optional<CustomMetadata> customMetadata() {
+ return customMetadata;
+ }
+
/**
* Returns the current state of this remote log segment. It can be any of
the below
* <ul>
@@ -223,7 +241,7 @@ public class RemoteLogSegmentMetadata extends
RemoteLogMetadata {
return new RemoteLogSegmentMetadata(remoteLogSegmentId, startOffset,
endOffset, maxTimestampMs, rlsmUpdate.brokerId(),
rlsmUpdate.eventTimestampMs(),
- segmentSizeInBytes, rlsmUpdate.state(), segmentLeaderEpochs);
+ segmentSizeInBytes, rlsmUpdate.customMetadata(),
rlsmUpdate.state(), segmentLeaderEpochs);
}
@Override
@@ -244,7 +262,9 @@ public class RemoteLogSegmentMetadata extends
RemoteLogMetadata {
&& maxTimestampMs == that.maxTimestampMs
&& segmentSizeInBytes == that.segmentSizeInBytes
&& Objects.equals(remoteLogSegmentId, that.remoteLogSegmentId)
- && Objects.equals(segmentLeaderEpochs,
that.segmentLeaderEpochs) && state == that.state
+ && Objects.equals(segmentLeaderEpochs,
that.segmentLeaderEpochs)
+ && Objects.equals(customMetadata, that.customMetadata)
+ && state == that.state
&& eventTimestampMs() == that.eventTimestampMs()
&& brokerId() == that.brokerId();
}
@@ -252,7 +272,7 @@ public class RemoteLogSegmentMetadata extends
RemoteLogMetadata {
@Override
public int hashCode() {
return Objects.hash(remoteLogSegmentId, startOffset, endOffset,
brokerId(), maxTimestampMs,
- eventTimestampMs(), segmentLeaderEpochs, segmentSizeInBytes,
state);
+ eventTimestampMs(), segmentLeaderEpochs, segmentSizeInBytes,
customMetadata, state);
}
@Override
@@ -266,8 +286,57 @@ public class RemoteLogSegmentMetadata extends
RemoteLogMetadata {
", eventTimestampMs=" + eventTimestampMs() +
", segmentLeaderEpochs=" + segmentLeaderEpochs +
", segmentSizeInBytes=" + segmentSizeInBytes +
+ ", customMetadata=" + customMetadata +
", state=" + state +
'}';
}
+ /**
+ * Custom metadata from a {@link RemoteStorageManager} plugin.
+ *
+ * <p>The content of these metadata is RSM-dependent and is opaque to the
broker, i.e.
+ * it's not interpreted, only stored along with the rest of the remote log
segment metadata.
+ *
+ * <p>Examples of such metadata are:
+ * <ol>
+ * <li>The storage path on the remote storage in case it's
nondeterministic or version-dependent.</li>
+ * <li>The actual size of the all files related to the segment on the
remote storage.</li>
+ * </ol>
+ *
+ * <p>The maximum size the broker accepts and stores is controlled by
+ * the {@code remote.log.metadata.custom.metadata.max.bytes} setting.
+ */
+ public static class CustomMetadata {
+ private final byte[] value;
+
+ public CustomMetadata(byte[] value) {
+ this.value = value;
+ }
+
+ public byte[] value() {
+ return value;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CustomMetadata that = (CustomMetadata) o;
+ return Arrays.equals(value, that.value);
+ }
+
+ @Override
+ public int hashCode() {
+ return Arrays.hashCode(value);
+ }
+
+ @Override
+ public String toString() {
+ return "CustomMetadata{" + value.length + " bytes}";
+ }
+ }
}
diff --git
a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadataUpdate.java
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadataUpdate.java
index a01df9602d2..210615ef53f 100644
---
a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadataUpdate.java
+++
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadataUpdate.java
@@ -18,8 +18,10 @@ package org.apache.kafka.server.log.remote.storage;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
+import
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
import java.util.Objects;
+import java.util.Optional;
/**
* It describes the metadata update about the log segment in the remote
storage. This is currently used to update the
@@ -34,6 +36,11 @@ public class RemoteLogSegmentMetadataUpdate extends
RemoteLogMetadata {
*/
private final RemoteLogSegmentId remoteLogSegmentId;
+ /**
+ * Custom metadata.
+ */
+ private final Optional<CustomMetadata> customMetadata;
+
/**
* It indicates the state in which the action is executed on this segment.
*/
@@ -42,13 +49,17 @@ public class RemoteLogSegmentMetadataUpdate extends
RemoteLogMetadata {
/**
* @param remoteLogSegmentId Universally unique remote log segment id.
* @param eventTimestampMs Epoch time in milli seconds at which the
remote log segment is copied to the remote tier storage.
+ * @param customMetadata Custom metadata.
* @param state State of the remote log segment.
* @param brokerId Broker id from which this event is generated.
*/
public RemoteLogSegmentMetadataUpdate(RemoteLogSegmentId
remoteLogSegmentId, long eventTimestampMs,
- RemoteLogSegmentState state, int
brokerId) {
+ Optional<CustomMetadata>
customMetadata,
+ RemoteLogSegmentState state,
+ int brokerId) {
super(brokerId, eventTimestampMs);
this.remoteLogSegmentId = Objects.requireNonNull(remoteLogSegmentId,
"remoteLogSegmentId can not be null");
+ this.customMetadata = Objects.requireNonNull(customMetadata,
"customMetadata can not be null");
this.state = Objects.requireNonNull(state, "state can not be null");
}
@@ -59,6 +70,13 @@ public class RemoteLogSegmentMetadataUpdate extends
RemoteLogMetadata {
return remoteLogSegmentId;
}
+ /**
+ * @return Custom metadata.
+ */
+ public Optional<CustomMetadata> customMetadata() {
+ return customMetadata;
+ }
+
/**
* It represents the state of the remote log segment. It can be one of the
values of {@link RemoteLogSegmentState}.
*/
@@ -81,6 +99,7 @@ public class RemoteLogSegmentMetadataUpdate extends
RemoteLogMetadata {
}
RemoteLogSegmentMetadataUpdate that = (RemoteLogSegmentMetadataUpdate)
o;
return Objects.equals(remoteLogSegmentId, that.remoteLogSegmentId) &&
+ Objects.equals(customMetadata, that.customMetadata) &&
state == that.state &&
eventTimestampMs() == that.eventTimestampMs() &&
brokerId() == that.brokerId();
@@ -88,14 +107,15 @@ public class RemoteLogSegmentMetadataUpdate extends
RemoteLogMetadata {
@Override
public int hashCode() {
- return Objects.hash(remoteLogSegmentId, state, eventTimestampMs(),
brokerId());
+ return Objects.hash(remoteLogSegmentId, customMetadata, state,
eventTimestampMs(), brokerId());
}
@Override
public String toString() {
return "RemoteLogSegmentMetadataUpdate{" +
"remoteLogSegmentId=" + remoteLogSegmentId +
- ", state=" + state +
+ ", customMetadata=" + customMetadata +
+ ", state=" + state +
", eventTimestampMs=" + eventTimestampMs() +
", brokerId=" + brokerId() +
'}';
diff --git
a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java
index cc26109969c..fa819979b2c 100644
---
a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java
+++
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java
@@ -18,9 +18,11 @@ package org.apache.kafka.server.log.remote.storage;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.annotation.InterfaceStability;
+import
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
import java.io.Closeable;
import java.io.InputStream;
+import java.util.Optional;
/**
* This interface provides the lifecycle of remote log segments that includes
copy, fetch, and delete from remote
@@ -81,10 +83,11 @@ public interface RemoteStorageManager extends Configurable,
Closeable {
*
* @param remoteLogSegmentMetadata metadata about the remote log segment.
* @param logSegmentData data to be copied to tiered storage.
+ * @return custom metadata to be added to the segment metadata after
copying.
* @throws RemoteStorageException if there are any errors in storing the
data of the segment.
*/
- void copyLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
- LogSegmentData logSegmentData)
+ Optional<CustomMetadata> copyLogSegmentData(RemoteLogSegmentMetadata
remoteLogSegmentMetadata,
+ LogSegmentData logSegmentData)
throws RemoteStorageException;
/**
diff --git
a/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteStorageManager.java
b/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteStorageManager.java
index 8a83033aa04..dfd905ce0cd 100644
---
a/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteStorageManager.java
+++
b/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteStorageManager.java
@@ -16,14 +16,18 @@
*/
package org.apache.kafka.server.log.remote.storage;
+import
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
+
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.Map;
+import java.util.Optional;
public class NoOpRemoteStorageManager implements RemoteStorageManager {
@Override
- public void copyLogSegmentData(RemoteLogSegmentMetadata
remoteLogSegmentMetadata,
- LogSegmentData logSegmentData) {
+ public Optional<CustomMetadata>
copyLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
+ LogSegmentData
logSegmentData) {
+ return Optional.empty();
}
@Override
diff --git
a/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadataTest.java
b/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadataTest.java
new file mode 100644
index 00000000000..4cd2b350441
--- /dev/null
+++
b/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadataTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.junit.jupiter.api.Test;
+import
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class RemoteLogSegmentMetadataTest {
+ private static final TopicIdPartition TP0 = new
TopicIdPartition(Uuid.randomUuid(),
+ new TopicPartition("foo", 0));
+
+ @Test
+ void createWithUpdates() {
+ int brokerId = 0;
+ int eventTimestamp = 0;
+ int brokerIdFinished = 1;
+ int timestampFinished = 1;
+ long startOffset = 0L;
+ long endOffset = 100L;
+ int segmentSize = 123;
+ long maxTimestamp = -1L;
+
+ Map<Integer, Long> segmentLeaderEpochs = new HashMap<>();
+ segmentLeaderEpochs.put(0, 0L);
+ RemoteLogSegmentId segmentId = new RemoteLogSegmentId(TP0,
Uuid.randomUuid());
+ RemoteLogSegmentMetadata segmentMetadata = new
RemoteLogSegmentMetadata(segmentId, startOffset, endOffset,
+ maxTimestamp, brokerId, eventTimestamp, segmentSize,
+ segmentLeaderEpochs);
+
+ CustomMetadata customMetadata = new CustomMetadata(new byte[]{0, 1, 2,
3});
+ RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new
RemoteLogSegmentMetadataUpdate(
+ segmentId, timestampFinished, Optional.of(customMetadata),
RemoteLogSegmentState.COPY_SEGMENT_FINISHED,
+ brokerIdFinished);
+ RemoteLogSegmentMetadata updatedMetadata =
segmentMetadata.createWithUpdates(segmentMetadataUpdate);
+
+ RemoteLogSegmentMetadata expectedUpdatedMetadata = new
RemoteLogSegmentMetadata(
+ segmentId, startOffset, endOffset,
+ maxTimestamp, brokerIdFinished, timestampFinished,
segmentSize, Optional.of(customMetadata),
+ RemoteLogSegmentState.COPY_SEGMENT_FINISHED,
+ segmentLeaderEpochs
+ );
+ assertEquals(expectedUpdatedMetadata, updatedMetadata);
+
+ // Check that the original metadata have not changed.
+ assertEquals(segmentId, segmentMetadata.remoteLogSegmentId());
+ assertEquals(startOffset, segmentMetadata.startOffset());
+ assertEquals(endOffset, segmentMetadata.endOffset());
+ assertEquals(maxTimestamp, segmentMetadata.maxTimestampMs());
+ assertEquals(brokerId, segmentMetadata.brokerId());
+ assertEquals(eventTimestamp, segmentMetadata.eventTimestampMs());
+ assertEquals(segmentSize, segmentMetadata.segmentSizeInBytes());
+ assertEquals(segmentLeaderEpochs,
segmentMetadata.segmentLeaderEpochs());
+ }
+}
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCache.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCache.java
index 15e4562a2da..0b0b2817061 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCache.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCache.java
@@ -77,7 +77,8 @@ public class FileBasedRemoteLogMetadataCache extends
RemoteLogMetadataCache {
private RemoteLogSegmentMetadata
createRemoteLogSegmentMetadata(RemoteLogSegmentMetadataSnapshot snapshot) {
return new RemoteLogSegmentMetadata(new
RemoteLogSegmentId(topicIdPartition, snapshot.segmentId()),
snapshot.startOffset(),
snapshot.endOffset(),
snapshot.maxTimestampMs(), snapshot.brokerId(), snapshot.eventTimestampMs(),
- snapshot.segmentSizeInBytes(),
snapshot.state(), snapshot.segmentLeaderEpochs());
+ snapshot.segmentSizeInBytes(),
snapshot.customMetadata(), snapshot.state(), snapshot.segmentLeaderEpochs()
+ );
}
/**
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentMetadataSnapshot.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentMetadataSnapshot.java
index c936d5056f8..ec1ed6a66d1 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentMetadataSnapshot.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentMetadataSnapshot.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import java.util.Collections;
@@ -27,6 +28,7 @@ import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.TreeMap;
+import java.util.Optional;
/**
* This class represents the entry containing the metadata about a remote log
segment. This is similar to
@@ -68,6 +70,11 @@ public class RemoteLogSegmentMetadataSnapshot extends
RemoteLogMetadata {
*/
private final int segmentSizeInBytes;
+ /**
+ * Custom metadata.
+ */
+ private final Optional<CustomMetadata> customMetadata;
+
/**
* It indicates the state in which the action is executed on this segment.
*/
@@ -79,13 +86,14 @@ public class RemoteLogSegmentMetadataSnapshot extends
RemoteLogMetadata {
* {@code segmentLeaderEpochs} can not be empty. If all the records in
this segment belong to the same leader epoch
* then it should have an entry with epoch mapping to start-offset of this
segment.
*
- * @param segmentId Universally unique remote log segment
id.
+ * @param segmentId Universally unique remote log segment id.
* @param startOffset Start offset of this segment (inclusive).
* @param endOffset End offset of this segment (inclusive).
* @param maxTimestampMs Maximum timestamp in milliseconds in this
segment.
* @param brokerId Broker id from which this event is generated.
* @param eventTimestampMs Epoch time in milliseconds at which the
remote log segment is copied to the remote tier storage.
* @param segmentSizeInBytes Size of this segment in bytes.
+ * @param customMetadata Custom metadata.
* @param state State of the respective segment of
remoteLogSegmentId.
* @param segmentLeaderEpochs leader epochs occurred within this segment.
*/
@@ -96,6 +104,7 @@ public class RemoteLogSegmentMetadataSnapshot extends
RemoteLogMetadata {
int brokerId,
long eventTimestampMs,
int segmentSizeInBytes,
+ Optional<CustomMetadata>
customMetadata,
RemoteLogSegmentState state,
Map<Integer, Long>
segmentLeaderEpochs) {
super(brokerId, eventTimestampMs);
@@ -106,6 +115,7 @@ public class RemoteLogSegmentMetadataSnapshot extends
RemoteLogMetadata {
this.endOffset = endOffset;
this.maxTimestampMs = maxTimestampMs;
this.segmentSizeInBytes = segmentSizeInBytes;
+ this.customMetadata = Objects.requireNonNull(customMetadata,
"customMetadata can not be null");
if (segmentLeaderEpochs == null || segmentLeaderEpochs.isEmpty()) {
throw new IllegalArgumentException("segmentLeaderEpochs can not be
null or empty");
@@ -117,7 +127,8 @@ public class RemoteLogSegmentMetadataSnapshot extends
RemoteLogMetadata {
public static RemoteLogSegmentMetadataSnapshot
create(RemoteLogSegmentMetadata metadata) {
return new
RemoteLogSegmentMetadataSnapshot(metadata.remoteLogSegmentId().id(),
metadata.startOffset(), metadata.endOffset(),
metadata.maxTimestampMs(),
metadata.brokerId(), metadata.eventTimestampMs(),
-
metadata.segmentSizeInBytes(), metadata.state(),
metadata.segmentLeaderEpochs());
+
metadata.segmentSizeInBytes(), metadata.customMetadata(), metadata.state(),
metadata.segmentLeaderEpochs()
+ );
}
/**
@@ -162,6 +173,13 @@ public class RemoteLogSegmentMetadataSnapshot extends
RemoteLogMetadata {
return segmentLeaderEpochs;
}
+ /**
+ * @return Custom metadata.
+ */
+ public Optional<CustomMetadata> customMetadata() {
+ return customMetadata;
+ }
+
/**
* Returns the current state of this remote log segment. It can be any of
the below
* <ul>
@@ -185,13 +203,19 @@ public class RemoteLogSegmentMetadataSnapshot extends
RemoteLogMetadata {
if (this == o) return true;
if (!(o instanceof RemoteLogSegmentMetadataSnapshot)) return false;
RemoteLogSegmentMetadataSnapshot that =
(RemoteLogSegmentMetadataSnapshot) o;
- return startOffset == that.startOffset && endOffset == that.endOffset
&& maxTimestampMs == that.maxTimestampMs && segmentSizeInBytes ==
that.segmentSizeInBytes && Objects.equals(
- segmentId, that.segmentId) &&
Objects.equals(segmentLeaderEpochs, that.segmentLeaderEpochs) && state ==
that.state;
+ return startOffset == that.startOffset
+ && endOffset == that.endOffset
+ && maxTimestampMs == that.maxTimestampMs
+ && segmentSizeInBytes == that.segmentSizeInBytes
+ && Objects.equals(customMetadata, that.customMetadata)
+ && Objects.equals(segmentId, that.segmentId)
+ && Objects.equals(segmentLeaderEpochs,
that.segmentLeaderEpochs)
+ && state == that.state;
}
@Override
public int hashCode() {
- return Objects.hash(segmentId, startOffset, endOffset, maxTimestampMs,
segmentLeaderEpochs, segmentSizeInBytes, state);
+ return Objects.hash(segmentId, startOffset, endOffset, maxTimestampMs,
segmentLeaderEpochs, segmentSizeInBytes, customMetadata, state);
}
@Override
@@ -203,6 +227,7 @@ public class RemoteLogSegmentMetadataSnapshot extends
RemoteLogMetadata {
", maxTimestampMs=" + maxTimestampMs +
", segmentLeaderEpochs=" + segmentLeaderEpochs +
", segmentSizeInBytes=" + segmentSizeInBytes +
+ ", customMetadata=" + customMetadata +
", state=" + state +
'}';
}
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransform.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransform.java
index bd613f8c9cd..ad47ee05c84 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransform.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransform.java
@@ -19,11 +19,13 @@ package
org.apache.kafka.server.log.remote.metadata.storage.serialization;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import
org.apache.kafka.server.log.remote.metadata.storage.RemoteLogSegmentMetadataSnapshot;
import
org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataSnapshotRecord;
+import
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.stream.Collectors;
public class RemoteLogSegmentMetadataSnapshotTransform implements
RemoteLogMetadataTransform<RemoteLogSegmentMetadataSnapshot> {
@@ -39,6 +41,7 @@ public class RemoteLogSegmentMetadataSnapshotTransform
implements RemoteLogMetad
.setSegmentSizeInBytes(segmentMetadata.segmentSizeInBytes())
.setSegmentLeaderEpochs(createSegmentLeaderEpochsEntry(segmentMetadata.segmentLeaderEpochs()))
.setRemoteLogSegmentState(segmentMetadata.state().id());
+ segmentMetadata.customMetadata().ifPresent(md ->
record.setCustomMetadata(md.value()));
return new ApiMessageAndVersion(record,
record.highestSupportedVersion());
}
@@ -59,6 +62,7 @@ public class RemoteLogSegmentMetadataSnapshotTransform
implements RemoteLogMetad
segmentLeaderEpochs.put(segmentLeaderEpoch.leaderEpoch(),
segmentLeaderEpoch.offset());
}
+ Optional<CustomMetadata> customMetadata =
Optional.ofNullable(record.customMetadata()).map(CustomMetadata::new);
return new RemoteLogSegmentMetadataSnapshot(record.segmentId(),
record.startOffset(),
record.endOffset(),
@@ -66,6 +70,7 @@ public class RemoteLogSegmentMetadataSnapshotTransform
implements RemoteLogMetad
record.brokerId(),
record.eventTimestampMs(),
record.segmentSizeInBytes(),
+ customMetadata,
RemoteLogSegmentState.forId(record.remoteLogSegmentState()),
segmentLeaderEpochs);
}
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java
index 4282b9e71f3..9e893d2cbc3 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java
@@ -22,12 +22,14 @@ import org.apache.kafka.server.common.ApiMessageAndVersion;
import
org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
import
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.stream.Collectors;
public class RemoteLogSegmentMetadataTransform implements
RemoteLogMetadataTransform<RemoteLogSegmentMetadata> {
@@ -43,6 +45,7 @@ public class RemoteLogSegmentMetadataTransform implements
RemoteLogMetadataTrans
.setSegmentSizeInBytes(segmentMetadata.segmentSizeInBytes())
.setSegmentLeaderEpochs(createSegmentLeaderEpochsEntry(segmentMetadata))
.setRemoteLogSegmentState(segmentMetadata.state().id());
+ segmentMetadata.customMetadata().ifPresent(md ->
record.setCustomMetadata(md.value()));
return new ApiMessageAndVersion(record,
record.highestSupportedVersion());
}
@@ -75,6 +78,7 @@ public class RemoteLogSegmentMetadataTransform implements
RemoteLogMetadataTrans
segmentLeaderEpochs.put(segmentLeaderEpoch.leaderEpoch(),
segmentLeaderEpoch.offset());
}
+ Optional<CustomMetadata> customMetadata =
Optional.ofNullable(record.customMetadata()).map(CustomMetadata::new);
RemoteLogSegmentMetadata remoteLogSegmentMetadata =
new RemoteLogSegmentMetadata(remoteLogSegmentId,
record.startOffset(), record.endOffset(),
record.maxTimestampMs(),
record.brokerId(),
@@ -82,6 +86,7 @@ public class RemoteLogSegmentMetadataTransform implements
RemoteLogMetadataTrans
segmentLeaderEpochs);
RemoteLogSegmentMetadataUpdate rlsmUpdate
= new RemoteLogSegmentMetadataUpdate(remoteLogSegmentId,
record.eventTimestampMs(),
+ customMetadata,
RemoteLogSegmentState.forId(record.remoteLogSegmentState()),
record.brokerId());
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataUpdateTransform.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataUpdateTransform.java
index 3db776520c6..e2d2bf8049c 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataUpdateTransform.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataUpdateTransform.java
@@ -21,9 +21,12 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import
org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataUpdateRecord;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
import
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+import java.util.Optional;
+
public class RemoteLogSegmentMetadataUpdateTransform implements
RemoteLogMetadataTransform<RemoteLogSegmentMetadataUpdate> {
public ApiMessageAndVersion
toApiMessageAndVersion(RemoteLogSegmentMetadataUpdate segmentMetadataUpdate) {
@@ -32,6 +35,7 @@ public class RemoteLogSegmentMetadataUpdateTransform
implements RemoteLogMetadat
.setBrokerId(segmentMetadataUpdate.brokerId())
.setEventTimestampMs(segmentMetadataUpdate.eventTimestampMs())
.setRemoteLogSegmentState(segmentMetadataUpdate.state().id());
+ segmentMetadataUpdate.customMetadata().ifPresent(md ->
record.setCustomMetadata(md.value()));
return new ApiMessageAndVersion(record,
record.highestSupportedVersion());
}
@@ -42,8 +46,9 @@ public class RemoteLogSegmentMetadataUpdateTransform
implements RemoteLogMetadat
TopicIdPartition topicIdPartition = new
TopicIdPartition(entry.topicIdPartition().id(),
new TopicPartition(entry.topicIdPartition().name(),
entry.topicIdPartition().partition()));
+ Optional<CustomMetadata> customMetadata =
Optional.ofNullable(record.customMetadata()).map(CustomMetadata::new);
return new RemoteLogSegmentMetadataUpdate(new
RemoteLogSegmentId(topicIdPartition, entry.id()),
- record.eventTimestampMs(),
RemoteLogSegmentState.forId(record.remoteLogSegmentState()), record.brokerId());
+ record.eventTimestampMs(), customMetadata,
RemoteLogSegmentState.forId(record.remoteLogSegmentState()), record.brokerId());
}
private RemoteLogSegmentMetadataUpdateRecord.RemoteLogSegmentIdEntry
createRemoteLogSegmentIdEntry(RemoteLogSegmentMetadataUpdate data) {
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/ClassLoaderAwareRemoteStorageManager.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/ClassLoaderAwareRemoteStorageManager.java
index 6284e4cb78e..0260264e344 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/ClassLoaderAwareRemoteStorageManager.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/ClassLoaderAwareRemoteStorageManager.java
@@ -18,9 +18,12 @@ package org.apache.kafka.server.log.remote.storage;
import org.apache.kafka.storage.internals.log.StorageAction;
+import
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
+
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
+import java.util.Optional;
/**
* A wrapper class of {@link RemoteStorageManager} that sets the context class
loader when calling the respective
@@ -66,12 +69,9 @@ public class ClassLoaderAwareRemoteStorageManager implements
RemoteStorageManage
}
}
- public void copyLogSegmentData(RemoteLogSegmentMetadata
remoteLogSegmentMetadata,
+ public Optional<CustomMetadata>
copyLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
LogSegmentData logSegmentData) throws
RemoteStorageException {
- withClassLoader(() -> {
- delegate.copyLogSegmentData(remoteLogSegmentMetadata,
logSegmentData);
- return null;
- });
+ return withClassLoader(() ->
delegate.copyLogSegmentData(remoteLogSegmentMetadata, logSegmentData));
}
@Override
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
index 17a2746fdb3..1167ee73c66 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
@@ -81,6 +81,13 @@ public final class RemoteLogManagerConfig {
public static final String REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_DOC =
"Listener name of the local broker to which it should get connected if " +
"needed by RemoteLogMetadataManager implementation.";
+ public static final String
REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_PROP =
"remote.log.metadata.custom.metadata.max.bytes";
+ public static final String
REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_DOC = "The maximum size of custom
metadata in bytes that the broker " +
+ "should accept from a remote storage plugin. If custom metadata
exceeds this limit, the updated segment metadata " +
+ "will not be stored, the copied data will be attempted to delete,
" +
+ "and the remote copying task for this topic-partition will stop
with an error.";
+ public static final int
DEFAULT_REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES = 128;
+
public static final String
REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP =
"remote.log.index.file.cache.total.size.bytes";
public static final String
REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_DOC = "The total size of the space
allocated to store index files fetched " +
"from remote storage in the local storage.";
@@ -181,6 +188,12 @@ public final class RemoteLogManagerConfig {
new ConfigDef.NonEmptyString(),
MEDIUM,
REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_DOC)
+
.defineInternal(REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_PROP,
+ INT,
+
DEFAULT_REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES,
+ atLeast(0),
+ LOW,
+
REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_DOC)
.defineInternal(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP,
LONG,
DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES,
@@ -260,6 +273,7 @@ public final class RemoteLogManagerConfig {
private final String remoteLogMetadataManagerPrefix;
private final HashMap<String, Object> remoteLogMetadataManagerProps;
private final String remoteLogMetadataManagerListenerName;
+ private final int remoteLogMetadataCustomMetadataMaxBytes;
public RemoteLogManagerConfig(AbstractConfig config) {
this(config.getBoolean(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP),
@@ -276,6 +290,7 @@ public final class RemoteLogManagerConfig {
config.getDouble(REMOTE_LOG_MANAGER_TASK_RETRY_JITTER_PROP),
config.getInt(REMOTE_LOG_READER_THREADS_PROP),
config.getInt(REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP),
+ config.getInt(REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_PROP),
config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP),
config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP) !=
null
?
config.originalsWithPrefix(config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP))
@@ -301,6 +316,7 @@ public final class RemoteLogManagerConfig {
double remoteLogManagerTaskRetryJitter,
int remoteLogReaderThreads,
int remoteLogReaderMaxPendingTasks,
+ int remoteLogMetadataCustomMetadataMaxBytes,
String remoteStorageManagerPrefix,
Map<String, Object>
remoteStorageManagerProps, /* properties having keys stripped out with
remoteStorageManagerPrefix */
String remoteLogMetadataManagerPrefix,
@@ -324,6 +340,7 @@ public final class RemoteLogManagerConfig {
this.remoteLogMetadataManagerPrefix = remoteLogMetadataManagerPrefix;
this.remoteLogMetadataManagerProps = new
HashMap<>(remoteLogMetadataManagerProps);
this.remoteLogMetadataManagerListenerName =
remoteLogMetadataManagerListenerName;
+ this.remoteLogMetadataCustomMetadataMaxBytes =
remoteLogMetadataCustomMetadataMaxBytes;
}
public boolean enableRemoteStorageSystem() {
@@ -382,6 +399,10 @@ public final class RemoteLogManagerConfig {
return remoteLogMetadataManagerListenerName;
}
+ public int remoteLogMetadataCustomMetadataMaxBytes() {
+ return remoteLogMetadataCustomMetadataMaxBytes;
+ }
+
public String remoteStorageManagerPrefix() {
return remoteStorageManagerPrefix;
}
@@ -412,6 +433,7 @@ public final class RemoteLogManagerConfig {
&& remoteLogManagerTaskRetryJitter ==
that.remoteLogManagerTaskRetryJitter
&& remoteLogReaderThreads == that.remoteLogReaderThreads
&& remoteLogReaderMaxPendingTasks ==
that.remoteLogReaderMaxPendingTasks
+ && remoteLogMetadataCustomMetadataMaxBytes ==
that.remoteLogMetadataCustomMetadataMaxBytes
&& Objects.equals(remoteStorageManagerClassName,
that.remoteStorageManagerClassName)
&& Objects.equals(remoteStorageManagerClassPath,
that.remoteStorageManagerClassPath)
&& Objects.equals(remoteLogMetadataManagerClassName,
that.remoteLogMetadataManagerClassName)
@@ -427,7 +449,7 @@ public final class RemoteLogManagerConfig {
public int hashCode() {
return Objects.hash(enableRemoteStorageSystem,
remoteStorageManagerClassName, remoteStorageManagerClassPath,
remoteLogMetadataManagerClassName,
remoteLogMetadataManagerClassPath, remoteLogMetadataManagerListenerName,
- remoteLogIndexFileCacheTotalSizeBytes,
remoteLogManagerThreadPoolSize, remoteLogManagerTaskIntervalMs,
+ remoteLogMetadataCustomMetadataMaxBytes,
remoteLogIndexFileCacheTotalSizeBytes, remoteLogManagerThreadPoolSize,
remoteLogManagerTaskIntervalMs,
remoteLogManagerTaskRetryBackoffMs,
remoteLogManagerTaskRetryBackoffMaxMs, remoteLogManagerTaskRetryJitter,
remoteLogReaderThreads,
remoteLogReaderMaxPendingTasks, remoteStorageManagerProps,
remoteLogMetadataManagerProps,
remoteStorageManagerPrefix,
remoteLogMetadataManagerPrefix);
diff --git
a/storage/src/main/resources/message/RemoteLogSegmentMetadataRecord.json
b/storage/src/main/resources/message/RemoteLogSegmentMetadataRecord.json
index d18144e4dfe..c737135a6a2 100644
--- a/storage/src/main/resources/message/RemoteLogSegmentMetadataRecord.json
+++ b/storage/src/main/resources/message/RemoteLogSegmentMetadataRecord.json
@@ -116,6 +116,14 @@
"versions": "0+",
"about": "Segment size in bytes."
},
+ {
+ "name": "CustomMetadata",
+ "type": "bytes",
+ "default": "null",
+ "versions": "0+",
+ "nullableVersions": "0+",
+ "about": "Custom metadata."
+ },
{
"name": "RemoteLogSegmentState",
"type": "int8",
diff --git
a/storage/src/main/resources/message/RemoteLogSegmentMetadataSnapshotRecord.json
b/storage/src/main/resources/message/RemoteLogSegmentMetadataSnapshotRecord.json
index dbb29139c19..20fb1732572 100644
---
a/storage/src/main/resources/message/RemoteLogSegmentMetadataSnapshotRecord.json
+++
b/storage/src/main/resources/message/RemoteLogSegmentMetadataSnapshotRecord.json
@@ -82,6 +82,14 @@
"versions": "0+",
"about": "Segment size in bytes"
},
+ {
+ "name": "CustomMetadata",
+ "type": "bytes",
+ "default": "null",
+ "versions": "0+",
+ "nullableVersions": "0+",
+ "about": "Custom metadata."
+ },
{
"name": "RemoteLogSegmentState",
"type": "int8",
diff --git
a/storage/src/main/resources/message/RemoteLogSegmentMetadataUpdateRecord.json
b/storage/src/main/resources/message/RemoteLogSegmentMetadataUpdateRecord.json
index 24003dcbce8..48aa34d4e91 100644
---
a/storage/src/main/resources/message/RemoteLogSegmentMetadataUpdateRecord.json
+++
b/storage/src/main/resources/message/RemoteLogSegmentMetadataUpdateRecord.json
@@ -72,6 +72,14 @@
"versions": "0+",
"about": "Epoch time in milli seconds at which this event is generated."
},
+ {
+ "name": "CustomMetadata",
+ "type": "bytes",
+ "default": "null",
+ "versions": "0+",
+ "nullableVersions": "0+",
+ "about": "Custom metadata."
+ },
{
"name": "RemoteLogSegmentState",
"type": "int8",
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCacheTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCacheTest.java
index 5f77417a458..d5341e07b07 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCacheTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCacheTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
import
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import org.apache.kafka.test.TestUtils;
@@ -50,8 +51,10 @@ public class FileBasedRemoteLogMetadataCacheTest {
0,
100, System.currentTimeMillis(), brokerId, System.currentTimeMillis(),
1024
* 1024, Collections.singletonMap(0, 0L));
cache.addCopyInProgressSegment(metadata1);
- RemoteLogSegmentMetadataUpdate metadataUpdate1 = new
RemoteLogSegmentMetadataUpdate(segmentId1, System.currentTimeMillis(),
-
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
+ RemoteLogSegmentMetadataUpdate metadataUpdate1 = new
RemoteLogSegmentMetadataUpdate(
+ segmentId1, System.currentTimeMillis(),
+ Optional.of(new CustomMetadata(new byte[]{0, 1, 2, 3})),
+ RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
cache.updateRemoteLogSegmentMetadata(metadataUpdate1);
Optional<RemoteLogSegmentMetadata> receivedMetadata =
cache.remoteLogSegmentMetadata(0, 0L);
assertTrue(receivedMetadata.isPresent());
@@ -63,8 +66,10 @@ public class FileBasedRemoteLogMetadataCacheTest {
0,
900, System.currentTimeMillis(), brokerId, System.currentTimeMillis(),
1024
* 1024, Collections.singletonMap(0, 0L));
cache.addCopyInProgressSegment(metadata2);
- RemoteLogSegmentMetadataUpdate metadataUpdate2 = new
RemoteLogSegmentMetadataUpdate(segmentId2, System.currentTimeMillis(),
-
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
+ RemoteLogSegmentMetadataUpdate metadataUpdate2 = new
RemoteLogSegmentMetadataUpdate(
+ segmentId2, System.currentTimeMillis(),
+ Optional.of(new CustomMetadata(new byte[]{4, 5, 6, 7})),
+ RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
cache.updateRemoteLogSegmentMetadata(metadataUpdate2);
// Fetch segment for leader epoch:0 and start offset:0, it should be
the newly added segment.
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCacheTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCacheTest.java
index 789997feed2..6fe08462803 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCacheTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCacheTest.java
@@ -31,6 +31,7 @@ import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.Map;
+import java.util.Optional;
public class RemoteLogMetadataCacheTest {
@@ -57,7 +58,7 @@ public class RemoteLogMetadataCacheTest {
-1L, BROKER_ID_0, time.milliseconds(), SEG_SIZE,
Collections.singletonMap(0, 0L));
RemoteLogSegmentMetadata updatedMetadata = segmentMetadata
.createWithUpdates(new
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(),
- time.milliseconds(), state, BROKER_ID_1));
+ time.milliseconds(), Optional.empty(), state,
BROKER_ID_1));
Assertions.assertThrows(IllegalArgumentException.class, () ->
cache.addCopyInProgressSegment(updatedMetadata));
}
@@ -67,7 +68,9 @@ public class RemoteLogMetadataCacheTest {
Assertions.assertThrows(RemoteResourceNotFoundException.class, () -> {
RemoteLogSegmentId nonExistingId = new RemoteLogSegmentId(TP0,
Uuid.randomUuid());
cache.updateRemoteLogSegmentMetadata(new
RemoteLogSegmentMetadataUpdate(nonExistingId,
- time.milliseconds(),
RemoteLogSegmentState.DELETE_SEGMENT_STARTED, BROKER_ID_1));
+ time.milliseconds(),
+ Optional.empty(),
+ RemoteLogSegmentState.DELETE_SEGMENT_STARTED,
BROKER_ID_1));
});
// Check for invalid state transition.
@@ -75,7 +78,9 @@ public class RemoteLogMetadataCacheTest {
RemoteLogSegmentMetadata segmentMetadata =
createSegmentUpdateWithState(cache, Collections.singletonMap(0, 0L), 0,
100, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
cache.updateRemoteLogSegmentMetadata(new
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(),
- time.milliseconds(),
RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, BROKER_ID_1));
+ time.milliseconds(),
+ Optional.empty(),
+ RemoteLogSegmentState.DELETE_SEGMENT_FINISHED,
BROKER_ID_1));
});
}
@@ -90,8 +95,11 @@ public class RemoteLogMetadataCacheTest {
BROKER_ID_0, time.milliseconds(), SEG_SIZE, segmentLeaderEpochs);
cache.addCopyInProgressSegment(segmentMetadata);
- RemoteLogSegmentMetadataUpdate segMetadataUpdate = new
RemoteLogSegmentMetadataUpdate(segmentId,
-
time.milliseconds(), state, BROKER_ID_1);
+ RemoteLogSegmentMetadataUpdate segMetadataUpdate = new
RemoteLogSegmentMetadataUpdate(
+ segmentId,
+ time.milliseconds(),
+ Optional.empty(),
+ state, BROKER_ID_1);
cache.updateRemoteLogSegmentMetadata(segMetadataUpdate);
return segmentMetadata.createWithUpdates(segMetadataUpdate);
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java
index 3f9db8cee13..e3d1a2aee0c 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java
@@ -23,6 +23,8 @@ import org.apache.kafka.common.Uuid;
import
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
@@ -30,6 +32,7 @@ import java.io.IOException;
import java.io.PrintStream;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -46,15 +49,22 @@ public class RemoteLogMetadataFormatterTest {
segLeaderEpochs.put(1, 20L);
segLeaderEpochs.put(2, 80L);
RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0,
SEGMENT_ID);
+ Optional<CustomMetadata> customMetadata = Optional.of(new
CustomMetadata(new byte[10]));
RemoteLogSegmentMetadata remoteLogMetadata = new
RemoteLogSegmentMetadata(
remoteLogSegmentId, 0L, 100L, -1L, 1,
- 123L, 1024, segLeaderEpochs);
+ 123L, 1024, customMetadata,
+ RemoteLogSegmentState.COPY_SEGMENT_STARTED, segLeaderEpochs);
byte[] metadataBytes = new
RemoteLogMetadataSerde().serialize(remoteLogMetadata);
ConsumerRecord<byte[], byte[]> metadataRecord = new
ConsumerRecord<>("__remote_log_metadata", 0, 0, null, metadataBytes);
String expected = String.format(
- "partition: 0, offset: 0, value:
RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=%s:foo-0,
id=%s}, startOffset=0, endOffset=100, brokerId=1, maxTimestampMs=-1,
eventTimestampMs=123, segmentLeaderEpochs={0=0, 1=20, 2=80},
segmentSizeInBytes=1024, state=COPY_SEGMENT_STARTED}\n",
+ "partition: 0, offset: 0, value: " +
+
"RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=%s:foo-0,
id=%s}, " +
+ "startOffset=0, endOffset=100, brokerId=1,
maxTimestampMs=-1, " +
+ "eventTimestampMs=123, segmentLeaderEpochs={0=0, 1=20,
2=80}, segmentSizeInBytes=1024, " +
+ "customMetadata=Optional[CustomMetadata{10 bytes}], " +
+ "state=COPY_SEGMENT_STARTED}\n",
TOPIC_ID, SEGMENT_ID);
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos)) {
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java
index 402d1a2994a..5b48790c7fd 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.utils.Time;
import
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
import
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
@@ -34,6 +35,7 @@ import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
public class RemoteLogMetadataSerdeTest {
@@ -69,12 +71,17 @@ public class RemoteLogMetadataSerdeTest {
segLeaderEpochs.put(2, 80L);
RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0,
Uuid.randomUuid());
return new RemoteLogSegmentMetadata(remoteLogSegmentId, 0L, 100L, -1L,
1,
- time.milliseconds(), 1024,
segLeaderEpochs);
+ time.milliseconds(), 1024,
+ Optional.of(new CustomMetadata(new
byte[] {0, 1, 2, 3})),
+
RemoteLogSegmentState.COPY_SEGMENT_STARTED,
+ segLeaderEpochs
+ );
}
private RemoteLogSegmentMetadataUpdate
createRemoteLogSegmentMetadataUpdate() {
RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0,
Uuid.randomUuid());
return new RemoteLogSegmentMetadataUpdate(remoteLogSegmentId,
time.milliseconds(),
+ Optional.of(new
CustomMetadata(new byte[] {0, 1, 2, 3})),
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, 2);
}
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFileTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFileTest.java
index 1b4602887be..dbfbbf3b044 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFileTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFileTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.server.log.remote.metadata.storage;
import org.apache.kafka.common.Uuid;
+import
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
@@ -61,10 +62,13 @@ public class RemoteLogMetadataSnapshotFileTest {
long startOffset = 0;
for (int i = 0; i < 100; i++) {
long endOffset = startOffset + 100L;
+ CustomMetadata customMetadata = new CustomMetadata(new
byte[]{(byte) i});
remoteLogSegmentMetadatas.add(
new RemoteLogSegmentMetadataSnapshot(Uuid.randomUuid(),
startOffset, endOffset,
System.currentTimeMillis(), 1, 100, 1024,
-
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, Collections.singletonMap(i,
startOffset)));
+
Optional.of(customMetadata),
+
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, Collections.singletonMap(i,
startOffset)
+ ));
startOffset = endOffset + 1;
}
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java
index 87e76833293..504f47e17a5 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java
@@ -27,6 +27,7 @@ import
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteL
import
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemotePartitionDeleteMetadataTransform;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
import
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
@@ -35,6 +36,7 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.Collections;
+import java.util.Optional;
public class RemoteLogMetadataTransformTest {
private static final TopicIdPartition TP0 = new
TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
@@ -58,6 +60,7 @@ public class RemoteLogMetadataTransformTest {
RemoteLogSegmentMetadataUpdate metadataUpdate =
new RemoteLogSegmentMetadataUpdate(new RemoteLogSegmentId(TP0,
Uuid.randomUuid()), time.milliseconds(),
+ Optional.of(new
CustomMetadata(new byte[]{0, 1, 2, 3})),
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, 1);
ApiMessageAndVersion apiMessageAndVersion =
metadataUpdateTransform.toApiMessageAndVersion(metadataUpdate);
RemoteLogSegmentMetadataUpdate metadataUpdateFromRecord =
metadataUpdateTransform.fromApiMessageAndVersion(apiMessageAndVersion);
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java
index b847e7cba3f..6928c6e281f 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java
@@ -92,7 +92,8 @@ public class RemoteLogSegmentLifecycleTest {
});
RemoteLogSegmentMetadataUpdate segment0Update = new
RemoteLogSegmentMetadataUpdate(
- segment0Id, time.milliseconds(),
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, BROKER_ID_1);
+ segment0Id, time.milliseconds(), Optional.empty(),
+ RemoteLogSegmentState.COPY_SEGMENT_FINISHED, BROKER_ID_1);
remoteLogSegmentLifecycleManager.updateRemoteLogSegmentMetadata(segment0Update);
RemoteLogSegmentMetadata expectedSegment0Metadata =
segment0Metadata.createWithUpdates(segment0Update);
@@ -167,6 +168,7 @@ public class RemoteLogSegmentLifecycleTest {
remoteLogSegmentLifecycleManager
.updateRemoteLogSegmentMetadata(new
RemoteLogSegmentMetadataUpdate(expectedSegment0Metadata.remoteLogSegmentId(),
time.milliseconds(),
+
Optional.empty(),
RemoteLogSegmentState.DELETE_SEGMENT_STARTED,
BROKER_ID_1));
Assertions.assertFalse(remoteLogSegmentLifecycleManager.remoteLogSegmentMetadata(0,
10).isPresent());
@@ -176,6 +178,7 @@ public class RemoteLogSegmentLifecycleTest {
remoteLogSegmentLifecycleManager
.updateRemoteLogSegmentMetadata(new
RemoteLogSegmentMetadataUpdate(expectedSegment0Metadata.remoteLogSegmentId(),
time.milliseconds(),
+
Optional.empty(),
RemoteLogSegmentState.DELETE_SEGMENT_FINISHED,
BROKER_ID_1));
Assertions.assertFalse(remoteLogSegmentLifecycleManager.remoteLogSegmentMetadata(0,
10).isPresent());
@@ -218,7 +221,9 @@ public class RemoteLogSegmentLifecycleTest {
time.milliseconds(), SEG_SIZE, segmentLeaderEpochs);
remoteLogSegmentLifecycleManager.addRemoteLogSegmentMetadata(segmentMetadata);
- RemoteLogSegmentMetadataUpdate segMetadataUpdate = new
RemoteLogSegmentMetadataUpdate(segmentId, time.milliseconds(), state,
BROKER_ID_1);
+ RemoteLogSegmentMetadataUpdate segMetadataUpdate = new
RemoteLogSegmentMetadataUpdate(segmentId, time.milliseconds(),
+ Optional.empty(),
+ state, BROKER_ID_1);
remoteLogSegmentLifecycleManager.updateRemoteLogSegmentMetadata(segMetadataUpdate);
return segmentMetadata.createWithUpdates(segMetadataUpdate);
@@ -367,6 +372,7 @@ public class RemoteLogSegmentLifecycleTest {
RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(),
time.milliseconds(),
+
Optional.empty(),
RemoteLogSegmentState.DELETE_SEGMENT_FINISHED,
BROKER_ID_1);
remoteLogSegmentLifecycleManager.updateRemoteLogSegmentMetadata(segmentMetadataUpdate);
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransformTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransformTest.java
new file mode 100644
index 00000000000..17d424249ff
--- /dev/null
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransformTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage.serialization;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import
org.apache.kafka.server.log.remote.metadata.storage.RemoteLogSegmentMetadataSnapshot;
+import
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class RemoteLogSegmentMetadataSnapshotTransformTest {
+ @ParameterizedTest
+ @MethodSource("parameters")
+ void testToAndFromMessage(Optional<CustomMetadata> customMetadata) {
+ Map<Integer, Long> segmentLeaderEpochs = new HashMap<>();
+ segmentLeaderEpochs.put(0, 0L);
+ RemoteLogSegmentMetadataSnapshot snapshot = new
RemoteLogSegmentMetadataSnapshot(
+ Uuid.randomUuid(),
+ 0L, 100L, -1L, 0, 0, 1234,
+ customMetadata,
+ RemoteLogSegmentState.COPY_SEGMENT_FINISHED,
+ segmentLeaderEpochs
+ );
+
+ RemoteLogSegmentMetadataSnapshotTransform transform = new
RemoteLogSegmentMetadataSnapshotTransform();
+ ApiMessageAndVersion message =
transform.toApiMessageAndVersion(snapshot);
+ assertEquals(snapshot, transform.fromApiMessageAndVersion(message));
+ }
+
+ private static Stream<Object> parameters() {
+ return Stream.of(
+ Optional.of(new CustomMetadata(new byte[]{0, 1, 2, 3})),
+ Optional.of(new CustomMetadata(new byte[0])),
+ Optional.empty()
+ );
+ }
+}
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransformTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransformTest.java
new file mode 100644
index 00000000000..84324e7435b
--- /dev/null
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransformTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage.serialization;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class RemoteLogSegmentMetadataTransformTest {
+ @ParameterizedTest
+ @MethodSource("parameters")
+ void testToAndFromMessage(Optional<CustomMetadata> customMetadata) {
+ Map<Integer, Long> segmentLeaderEpochs = new HashMap<>();
+ segmentLeaderEpochs.put(0, 0L);
+ RemoteLogSegmentMetadata metadata = new RemoteLogSegmentMetadata(
+ new RemoteLogSegmentId(new TopicIdPartition(Uuid.randomUuid(),
0, "topic"), Uuid.randomUuid()),
+ 0L, 100L, -1L, 0, 0, 1234,
+ customMetadata,
+ RemoteLogSegmentState.COPY_SEGMENT_FINISHED,
+ segmentLeaderEpochs
+ );
+
+ RemoteLogSegmentMetadataTransform transform = new
RemoteLogSegmentMetadataTransform();
+ ApiMessageAndVersion message =
transform.toApiMessageAndVersion(metadata);
+ assertEquals(metadata, transform.fromApiMessageAndVersion(message));
+ }
+
+ private static Stream<Object> parameters() {
+ return Stream.of(
+ Optional.of(new CustomMetadata(new byte[]{0, 1, 2, 3})),
+ Optional.of(new CustomMetadata(new byte[0])),
+ Optional.empty()
+ );
+ }
+}
\ No newline at end of file
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataUpdateTransformTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataUpdateTransformTest.java
new file mode 100644
index 00000000000..09f1b7620a0
--- /dev/null
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataUpdateTransformTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage.serialization;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
+import
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class RemoteLogSegmentMetadataUpdateTransformTest {
+ @ParameterizedTest
+ @MethodSource("parameters")
+ void testToAndFromMessage(Optional<CustomMetadata> customMetadata) {
+ Map<Integer, Long> segmentLeaderEpochs = new HashMap<>();
+ segmentLeaderEpochs.put(0, 0L);
+ RemoteLogSegmentMetadataUpdate metadataUpdate = new
RemoteLogSegmentMetadataUpdate(
+ new RemoteLogSegmentId(new TopicIdPartition(Uuid.randomUuid(),
0, "topic"), Uuid.randomUuid()),
+ 123L,
+ customMetadata,
+ RemoteLogSegmentState.COPY_SEGMENT_FINISHED,
+ 1
+ );
+
+ RemoteLogSegmentMetadataUpdateTransform transform = new
RemoteLogSegmentMetadataUpdateTransform();
+ ApiMessageAndVersion message =
transform.toApiMessageAndVersion(metadataUpdate);
+ assertEquals(metadataUpdate,
transform.fromApiMessageAndVersion(message));
+ }
+
+ private static Stream<Object> parameters() {
+ return Stream.of(
+ Optional.of(new CustomMetadata(new byte[]{0, 1, 2, 3})),
+ Optional.of(new CustomMetadata(new byte[0])),
+ Optional.empty()
+ );
+ }
+}
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteStorageManager.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteStorageManager.java
index c9541f66fec..8650cea5d47 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteStorageManager.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteStorageManager.java
@@ -26,8 +26,11 @@ import java.nio.file.Files;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
+import
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
+
/**
* This class is an implementation of {@link RemoteStorageManager} backed by
in-memory store.
*/
@@ -52,8 +55,8 @@ public class InmemoryRemoteStorageManager implements
RemoteStorageManager {
}
@Override
- public void copyLogSegmentData(RemoteLogSegmentMetadata
remoteLogSegmentMetadata,
- LogSegmentData logSegmentData)
+ public Optional<CustomMetadata>
copyLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
+ LogSegmentData
logSegmentData)
throws RemoteStorageException {
log.debug("copying log segment and indexes for : {}",
remoteLogSegmentMetadata);
Objects.requireNonNull(remoteLogSegmentMetadata,
"remoteLogSegmentMetadata can not be null");
@@ -83,6 +86,7 @@ public class InmemoryRemoteStorageManager implements
RemoteStorageManager {
throw new RemoteStorageException(e);
}
log.debug("copied log segment and indexes for : {} successfully.",
remoteLogSegmentMetadata);
+ return Optional.empty();
}
@Override
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java
index 058d8a5f649..a8847e5bba9 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import
org.apache.kafka.server.log.remote.storage.LocalTieredStorageListener.LocalTieredStorageListeners;
+import
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.test.TestUtils;
import org.slf4j.Logger;
@@ -302,9 +303,9 @@ public final class LocalTieredStorage implements
RemoteStorageManager {
}
@Override
- public void copyLogSegmentData(final RemoteLogSegmentMetadata metadata,
final LogSegmentData data)
+ public Optional<CustomMetadata> copyLogSegmentData(final
RemoteLogSegmentMetadata metadata, final LogSegmentData data)
throws RemoteStorageException {
- Callable<Void> callable = () -> {
+ Callable<Optional<CustomMetadata>> callable = () -> {
final RemoteLogSegmentId id = metadata.remoteLogSegmentId();
final LocalTieredStorageEvent.Builder eventBuilder =
newEventBuilder(COPY_SEGMENT, id);
RemoteLogSegmentFileset fileset = null;
@@ -331,10 +332,10 @@ public final class LocalTieredStorage implements
RemoteStorageManager {
throw e;
}
- return null;
+ return Optional.empty();
};
- wrap(callable);
+ return wrap(callable);
}
@Override
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java
index bb3c2ff73c4..c8b428ca547 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java
@@ -41,7 +41,7 @@ public class RemoteLogManagerConfigTest {
RemoteLogManagerConfig expectedRemoteLogManagerConfig
= new RemoteLogManagerConfig(true,
"dummy.remote.storage.class", "dummy.remote.storage.class.path",
"dummy.remote.log.metadata.class", "dummy.remote.log.metadata.class.path",
- "listener.name", 1024 * 1024L, 1,
60000L, 100L, 60000L, 0.3, 10, 100,
+ "listener.name", 1024 * 1024L, 1,
60000L, 100L, 60000L, 0.3, 10, 100, 100,
rsmPrefix, rsmProps, rlmmPrefix,
rlmmProps);
Map<String, Object> props =
extractProps(expectedRemoteLogManagerConfig);
@@ -82,6 +82,8 @@ public class RemoteLogManagerConfigTest {
remoteLogManagerConfig.remoteLogReaderThreads());
props.put(RemoteLogManagerConfig.REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP,
remoteLogManagerConfig.remoteLogReaderMaxPendingTasks());
+
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_PROP,
+
remoteLogManagerConfig.remoteLogMetadataCustomMetadataMaxBytes());
props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP,
remoteLogManagerConfig.remoteStorageManagerPrefix());
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP,
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java
index 95521c42dcf..528843a90f6 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java
@@ -69,7 +69,8 @@ public class RemoteLogMetadataManagerTest {
// 2.Move that segment to COPY_SEGMENT_FINISHED state and this
segment should be available.
RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new
RemoteLogSegmentMetadataUpdate(segmentId, time.milliseconds(),
-
RemoteLogSegmentState.COPY_SEGMENT_FINISHED,
+ Optional.empty(),
+ RemoteLogSegmentState.COPY_SEGMENT_FINISHED,
BROKER_ID_1);
// Wait until the segment is updated successfully.
remoteLogMetadataManager.updateRemoteLogSegmentMetadata(segmentMetadataUpdate).get();
@@ -108,7 +109,7 @@ public class RemoteLogMetadataManagerTest {
remoteLogMetadataManager.addRemoteLogSegmentMetadata(segmentMetadata).get();
RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new
RemoteLogSegmentMetadataUpdate(
- segmentId, time.milliseconds(),
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, BROKER_ID_1);
+ segmentId, time.milliseconds(), Optional.empty(),
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, BROKER_ID_1);
// Wait until the segment is updated successfully.
remoteLogMetadataManager.updateRemoteLogSegmentMetadata(segmentMetadataUpdate).get();