This is an automated email from the ASF dual-hosted git repository.

divijv pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a3204aed2eb KAFKA-15194: Prepend offset in the filenames used by 
LocalTieredStorage (#14057)
a3204aed2eb is described below

commit a3204aed2eb6f2fe7d9f300cca686d3bc2a37c7c
Author: Owen Leung <[email protected]>
AuthorDate: Sat Jul 22 19:47:26 2023 +0800

    KAFKA-15194: Prepend offset in the filenames used by LocalTieredStorage 
(#14057)
    
    Reviewers: Divij Vaidya <[email protected]>
---
 .../kafka/storage/internals/log/LogFileUtils.java  |  2 +-
 .../log/remote/storage/LocalTieredStorage.java     | 34 +++++-----
 .../log/remote/storage/LocalTieredStorageTest.java | 77 ++++++++++++----------
 .../remote/storage/RemoteLogSegmentFileset.java    | 34 +++++-----
 4 files changed, 77 insertions(+), 70 deletions(-)

diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogFileUtils.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogFileUtils.java
index 2680b2dbe30..2664aeda690 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogFileUtils.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogFileUtils.java
@@ -84,7 +84,7 @@ public final class LogFileUtils {
      * @param offset The offset to use in the file name
      * @return The filename
      */
-    private static String filenamePrefixFromOffset(long offset) {
+    public static String filenamePrefixFromOffset(long offset) {
         NumberFormat nf = NumberFormat.getInstance();
         nf.setMinimumIntegerDigits(20);
         nf.setMaximumFractionDigits(0);
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java
index 3a17fcc24de..058d8a5f649 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java
@@ -81,25 +81,25 @@ import static 
org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDir
  * The local tiered storage keeps a simple structure of directories mimicking 
that of Apache Kafka.
  * <p>
  * The name of each of the files under the scope of a log segment (the log 
file, its indexes, etc.)
- * follows the structure UuidBase64-FileType.
+ * follows the structure startOffset-UuidBase64-FileType.
  * <p>
  * Given the root directory of the storage, segments and associated files are 
organized as represented below.
  * </p>
  * <code>
- * / storage-directory  / topic-0-LWgrMmVrT0a__7a4SasuPA / 
bCqX9U--S-6U8XUM9II25Q.log
- * .                                                     . 
bCqX9U--S-6U8XUM9II25Q.index
- * .                                                     . 
bCqX9U--S-6U8XUM9II25Q.timeindex
- * .                                                     . 
h956soEzTzi9a-NOQ-DvKA.log
- * .                                                     . 
h956soEzTzi9a-NOQ-DvKA.index
- * .                                                     . 
h956soEzTzi9a-NOQ-DvKA.timeindex
+ * / storage-directory  / topic-0-LWgrMmVrT0a__7a4SasuPA / 
00000000000000000011-bCqX9U--S-6U8XUM9II25Q.log
+ * .                                                     . 
00000000000000000011-bCqX9U--S-6U8XUM9II25Q.index
+ * .                                                     . 
00000000000000000011-bCqX9U--S-6U8XUM9II25Q.timeindex
+ * .                                                     . 
00000000000000000011-h956soEzTzi9a-NOQ-DvKA.log
+ * .                                                     . 
00000000000000000011-h956soEzTzi9a-NOQ-DvKA.index
+ * .                                                     . 
00000000000000000011-h956soEzTzi9a-NOQ-DvKA.timeindex
  * .
- * / topic-1-LWgrMmVrT0a__7a4SasuPA / o8CQPT86QQmbFmi3xRmiHA.log
- * .                                . o8CQPT86QQmbFmi3xRmiHA.index
- * .                                . o8CQPT86QQmbFmi3xRmiHA.timeindex
+ * / topic-1-LWgrMmVrT0a__7a4SasuPA / 
00000000000000000011-o8CQPT86QQmbFmi3xRmiHA.log
+ * .                                . 
00000000000000000011-o8CQPT86QQmbFmi3xRmiHA.index
+ * .                                . 
00000000000000000011-o8CQPT86QQmbFmi3xRmiHA.timeindex
  * .
- * / btopic-3-DRagLm_PS9Wl8fz1X43zVg / jvj3vhliTGeU90sIosmp_g.log
- * .                                 . jvj3vhliTGeU90sIosmp_g.index
- * .                                 . jvj3vhliTGeU90sIosmp_g.timeindex
+ * / topic-3-DRagLm_PS9Wl8fz1X43zVg / 
00000000000000000011-jvj3vhliTGeU90sIosmp_g.log
+ * .                                . 
00000000000000000011-jvj3vhliTGeU90sIosmp_g.index
+ * .                                . 
00000000000000000011-jvj3vhliTGeU90sIosmp_g.timeindex
  * </code>
  */
 public final class LocalTieredStorage implements RemoteStorageManager {
@@ -310,7 +310,7 @@ public final class LocalTieredStorage implements 
RemoteStorageManager {
             RemoteLogSegmentFileset fileset = null;
 
             try {
-                fileset = openFileset(storageDirectory, id);
+                fileset = openFileset(storageDirectory, metadata);
 
                 logger.info("Offloading log segment for {} from segment={}", 
id.topicIdPartition(), data.logSegment());
 
@@ -359,7 +359,7 @@ public final class LocalTieredStorage implements 
RemoteStorageManager {
             eventBuilder.withStartPosition(startPos).withEndPosition(endPos);
 
             try {
-                final RemoteLogSegmentFileset fileset = 
openFileset(storageDirectory, metadata.remoteLogSegmentId());
+                final RemoteLogSegmentFileset fileset = 
openFileset(storageDirectory, metadata);
 
                 final InputStream inputStream = 
newInputStream(fileset.getFile(SEGMENT).toPath(), READ);
                 inputStream.skip(startPos);
@@ -386,7 +386,7 @@ public final class LocalTieredStorage implements 
RemoteStorageManager {
             final LocalTieredStorageEvent.Builder eventBuilder = 
newEventBuilder(eventType, metadata);
 
             try {
-                final RemoteLogSegmentFileset fileset = 
openFileset(storageDirectory, metadata.remoteLogSegmentId());
+                final RemoteLogSegmentFileset fileset = 
openFileset(storageDirectory, metadata);
 
                 File file = fileset.getFile(fileType);
                 final InputStream inputStream = (fileType.isOptional() && 
!file.exists()) ?
@@ -411,7 +411,7 @@ public final class LocalTieredStorage implements 
RemoteStorageManager {
             if (deleteEnabled) {
                 try {
                     final RemoteLogSegmentFileset fileset = openFileset(
-                            storageDirectory, metadata.remoteLogSegmentId());
+                            storageDirectory, metadata);
 
                     if (!fileset.delete()) {
                         throw new RemoteStorageException("Failed to delete 
remote log segment with id:" +
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java
index 7140684dac2..273ce6ce2a0 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java
@@ -123,18 +123,19 @@ public final class LocalTieredStorageTest {
         final RemoteLogSegmentMetadata metadata = 
newRemoteLogSegmentMetadata(id);
         tieredStorage.copyLogSegmentData(metadata, segment);
 
-        remoteStorageVerifier.verifyContainsLogSegmentFiles(id);
+        remoteStorageVerifier.verifyContainsLogSegmentFiles(metadata);
     }
 
     @Test
     public void copyDataFromLogSegment() throws RemoteStorageException {
         final byte[] data = new byte[]{0, 1, 2};
         final RemoteLogSegmentId id = newRemoteLogSegmentId();
+        final RemoteLogSegmentMetadata metadata = 
newRemoteLogSegmentMetadata(id);
         final LogSegmentData segment = localLogSegments.nextSegment(data);
 
-        tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id), 
segment);
+        tieredStorage.copyLogSegmentData(metadata, segment);
 
-        remoteStorageVerifier.verifyRemoteLogSegmentMatchesLocal(id, segment);
+        remoteStorageVerifier.verifyRemoteLogSegmentMatchesLocal(metadata, 
segment);
     }
 
     @Test
@@ -201,41 +202,44 @@ public final class LocalTieredStorageTest {
     @Test
     public void deleteLogSegment() throws RemoteStorageException {
         final RemoteLogSegmentId id = newRemoteLogSegmentId();
+        final RemoteLogSegmentMetadata metadata = 
newRemoteLogSegmentMetadata(id);
         final LogSegmentData segment = localLogSegments.nextSegment();
 
         tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id), 
segment);
-        remoteStorageVerifier.verifyContainsLogSegmentFiles(id);
+        remoteStorageVerifier.verifyContainsLogSegmentFiles(metadata);
 
         tieredStorage.deleteLogSegmentData(newRemoteLogSegmentMetadata(id));
-        remoteStorageVerifier.verifyLogSegmentFilesAbsent(id);
+        remoteStorageVerifier.verifyLogSegmentFilesAbsent(metadata);
     }
 
     @Test
     public void deletePartition() throws RemoteStorageException {
         int segmentCount = 10;
-        List<RemoteLogSegmentId> segmentIds = new ArrayList<>();
+        List<RemoteLogSegmentMetadata> segmentMetadatas = new ArrayList<>();
         for (int i = 0; i < segmentCount; i++) {
             final RemoteLogSegmentId id = newRemoteLogSegmentId();
+            final RemoteLogSegmentMetadata metadata = 
newRemoteLogSegmentMetadata(id);
             final LogSegmentData segment = localLogSegments.nextSegment();
-            tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id), 
segment);
-            remoteStorageVerifier.verifyContainsLogSegmentFiles(id);
-            segmentIds.add(id);
+            tieredStorage.copyLogSegmentData(metadata, segment);
+            remoteStorageVerifier.verifyContainsLogSegmentFiles(metadata);
+            segmentMetadatas.add(metadata);
         }
         tieredStorage.deletePartition(topicIdPartition);
         
remoteStorageVerifier.assertFileDoesNotExist(remoteStorageVerifier.expectedPartitionPath());
-        for (RemoteLogSegmentId segmentId: segmentIds) {
-            remoteStorageVerifier.verifyLogSegmentFilesAbsent(segmentId);
+        for (RemoteLogSegmentMetadata segmentMetadata: segmentMetadatas) {
+            remoteStorageVerifier.verifyLogSegmentFilesAbsent(segmentMetadata);
         }
     }
 
     @Test
     public void deleteLogSegmentWithoutOptionalFiles() throws 
RemoteStorageException {
         final RemoteLogSegmentId id = newRemoteLogSegmentId();
+        final RemoteLogSegmentMetadata metadata = 
newRemoteLogSegmentMetadata(id);
         final LogSegmentData segment = localLogSegments.nextSegment();
         segment.transactionIndex().get().toFile().delete();
 
-        tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id), 
segment);
-        remoteStorageVerifier.verifyContainsLogSegmentFiles(id, path -> {
+        tieredStorage.copyLogSegmentData(metadata, segment);
+        remoteStorageVerifier.verifyContainsLogSegmentFiles(metadata, path -> {
             String fileName = path.getFileName().toString();
             if (!fileName.contains(LogFileUtils.TXN_INDEX_FILE_SUFFIX)) {
                 remoteStorageVerifier.assertFileExists(path);
@@ -243,7 +247,7 @@ public final class LocalTieredStorageTest {
         });
 
         tieredStorage.deleteLogSegmentData(newRemoteLogSegmentMetadata(id));
-        remoteStorageVerifier.verifyLogSegmentFilesAbsent(id);
+        remoteStorageVerifier.verifyLogSegmentFilesAbsent(metadata);
     }
 
     @Test
@@ -252,12 +256,12 @@ public final class LocalTieredStorageTest {
 
         final RemoteLogSegmentId id = newRemoteLogSegmentId();
         final LogSegmentData segment = localLogSegments.nextSegment();
-
+        final RemoteLogSegmentMetadata metadata = 
newRemoteLogSegmentMetadata(id);
         tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id), 
segment);
-        remoteStorageVerifier.verifyContainsLogSegmentFiles(id);
+        remoteStorageVerifier.verifyContainsLogSegmentFiles(metadata);
 
         tieredStorage.deleteLogSegmentData(newRemoteLogSegmentMetadata(id));
-        remoteStorageVerifier.verifyContainsLogSegmentFiles(id);
+        remoteStorageVerifier.verifyContainsLogSegmentFiles(metadata);
     }
 
     @Test
@@ -399,20 +403,21 @@ public final class LocalTieredStorageTest {
             this.topicIdPartition = requireNonNull(topicIdPartition);
         }
 
-        private List<Path> expectedPaths(final RemoteLogSegmentId id) {
+        private List<Path> expectedPaths(final RemoteLogSegmentMetadata 
metadata) {
             final String rootPath = getStorageRootDirectory();
             TopicPartition tp = topicIdPartition.topicPartition();
             final String topicPartitionSubpath = format("%s-%d-%s", 
tp.topic(), tp.partition(),
                     topicIdPartition.topicId());
-            final String uuid = id.id().toString();
+            final String uuid = metadata.remoteLogSegmentId().id().toString();
+            final String startOffset = 
LogFileUtils.filenamePrefixFromOffset(metadata.startOffset());
 
             return Arrays.asList(
-                    Paths.get(rootPath, topicPartitionSubpath, uuid + 
LogFileUtils.LOG_FILE_SUFFIX),
-                    Paths.get(rootPath, topicPartitionSubpath, uuid + 
LogFileUtils.INDEX_FILE_SUFFIX),
-                    Paths.get(rootPath, topicPartitionSubpath, uuid + 
LogFileUtils.TIME_INDEX_FILE_SUFFIX),
-                    Paths.get(rootPath, topicPartitionSubpath, uuid + 
LogFileUtils.TXN_INDEX_FILE_SUFFIX),
-                    Paths.get(rootPath, topicPartitionSubpath, uuid + 
LEADER_EPOCH_CHECKPOINT.getSuffix()),
-                    Paths.get(rootPath, topicPartitionSubpath, uuid + 
LogFileUtils.PRODUCER_SNAPSHOT_FILE_SUFFIX)
+                    Paths.get(rootPath, topicPartitionSubpath, startOffset + 
"-" + uuid + LogFileUtils.LOG_FILE_SUFFIX),
+                    Paths.get(rootPath, topicPartitionSubpath, startOffset + 
"-" + uuid + LogFileUtils.INDEX_FILE_SUFFIX),
+                    Paths.get(rootPath, topicPartitionSubpath, startOffset + 
"-" + uuid + LogFileUtils.TIME_INDEX_FILE_SUFFIX),
+                    Paths.get(rootPath, topicPartitionSubpath, startOffset + 
"-" + uuid + LogFileUtils.TXN_INDEX_FILE_SUFFIX),
+                    Paths.get(rootPath, topicPartitionSubpath, startOffset + 
"-" + uuid + LEADER_EPOCH_CHECKPOINT.getSuffix()),
+                    Paths.get(rootPath, topicPartitionSubpath, startOffset + 
"-" + uuid + LogFileUtils.PRODUCER_SNAPSHOT_FILE_SUFFIX)
             );
         }
 
@@ -424,37 +429,37 @@ public final class LocalTieredStorageTest {
             return Paths.get(rootPath, topicPartitionSubpath);
         }
 
-        public void verifyContainsLogSegmentFiles(final RemoteLogSegmentId id, 
final Consumer<Path> action) {
-            expectedPaths(id).forEach(action);
+        public void verifyContainsLogSegmentFiles(final 
RemoteLogSegmentMetadata metadata, final Consumer<Path> action) {
+            expectedPaths(metadata).forEach(action);
         }
 
         /**
          * Verify the remote storage contains remote log segment and 
associated files for the provided {@code id}.
          *
-         * @param id The unique ID of the remote log segment and associated 
resources (e.g. offset and time indexes).
+         * @param metadata The metadata of the remote log segment and 
associated resources (e.g. offset and time indexes).
          */
-        public void verifyContainsLogSegmentFiles(final RemoteLogSegmentId id) 
{
-            expectedPaths(id).forEach(this::assertFileExists);
+        public void verifyContainsLogSegmentFiles(final 
RemoteLogSegmentMetadata metadata) {
+            expectedPaths(metadata).forEach(this::assertFileExists);
         }
 
         /**
          * Verify the remote storage does NOT contain remote log segment and 
associated files for the provided {@code id}.
          *
-         * @param id The unique ID of the remote log segment and associated 
resources (e.g. offset and time indexes).
+         * @param metadata The metadata of the remote log segment and 
associated resources (e.g. offset and time indexes).
          */
-        public void verifyLogSegmentFilesAbsent(final RemoteLogSegmentId id) {
-            expectedPaths(id).forEach(this::assertFileDoesNotExist);
+        public void verifyLogSegmentFilesAbsent(final RemoteLogSegmentMetadata 
metadata) {
+            expectedPaths(metadata).forEach(this::assertFileDoesNotExist);
         }
 
         /**
          * Compare the content of the remote segment with the provided {@link 
LogSegmentData}.
          * This method does not fetch from the remote storage.
          *
-         * @param id The unique ID of the remote log segment and associated 
resources (e.g. offset and time indexes).
+         * @param metadata The metadata of the remote log segment and 
associated resources (e.g. offset and time indexes).
          * @param seg The segment stored on Kafka's local storage.
          */
-        public void verifyRemoteLogSegmentMatchesLocal(final 
RemoteLogSegmentId id, final LogSegmentData seg) {
-            final Path remoteSegmentPath = expectedPaths(id).get(0);
+        public void verifyRemoteLogSegmentMatchesLocal(final 
RemoteLogSegmentMetadata metadata, final LogSegmentData seg) {
+            final Path remoteSegmentPath = expectedPaths(metadata).get(0);
             assertFileDataEquals(remoteSegmentPath, seg.logSegment());
         }
 
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentFileset.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentFileset.java
index b6eae36ab26..08f1ddbdf1e 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentFileset.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentFileset.java
@@ -59,9 +59,9 @@ import static org.slf4j.LoggerFactory.getLogger;
  * the local tiered storage:
  *
  * <code>
- * / storage-directory / topic-partition-uuidBase64 / 
oAtiIQ95REujbuzNd_lkLQ.log
- *                                                  . 
oAtiIQ95REujbuzNd_lkLQ.index
- *                                                  . 
oAtiIQ95REujbuzNd_lkLQ.timeindex
+ * / storage-directory / topic-partition-uuidBase64 / 
00000000000000000011-oAtiIQ95REujbuzNd_lkLQ.log
+ *                                                  . 
00000000000000000011-oAtiIQ95REujbuzNd_lkLQ.index
+ *                                                  . 
00000000000000000011-oAtiIQ95REujbuzNd_lkLQ.timeindex
  * </code>
  */
 public final class RemoteLogSegmentFileset {
@@ -73,9 +73,9 @@ public final class RemoteLogSegmentFileset {
      * The name of each of the files under the scope of a log segment (the log 
file, its indexes, etc.)
      * follows the structure UUID-FileType.
      */
-    private static final Pattern FILENAME_FORMAT = 
compile("([a-zA-Z0-9_-]{22})(\\.[a-z_]+)");
-    private static final int GROUP_UUID = 1;
-    private static final int GROUP_FILE_TYPE = 2;
+    private static final Pattern FILENAME_FORMAT = 
compile("(\\d+-)([a-zA-Z0-9_-]{22})(\\.[a-z_]+)");
+    private static final int GROUP_UUID = 2;
+    private static final int GROUP_FILE_TYPE = 3;
 
     /**
      * Characterises the type of a file in the local tiered storage copied 
from Apache Kafka's standard storage.
@@ -98,10 +98,10 @@ public final class RemoteLogSegmentFileset {
 
         /**
          * Provides the name of the file of this type for the given UUID in 
the local tiered storage,
-         * e.g. uuid.log.
+         * e.g. 0-uuid.log.
          */
-        public String toFilename(final Uuid uuid) {
-            return uuid.toString() + suffix;
+        public String toFilename(final String startOffset, final Uuid uuid) {
+            return startOffset + "-" + uuid.toString() + suffix;
         }
 
         /**
@@ -155,19 +155,21 @@ public final class RemoteLogSegmentFileset {
      * the log segment offloaded are not created on the file system until 
transfer happens.
      *
      * @param storageDir The root directory of the local tiered storage.
-     * @param id Remote log segment id assigned to a log segment in Kafka.
+     * @param metadata Remote log metadata about a topic partition's remote 
log.
      * @return A new fileset instance.
      */
-    public static RemoteLogSegmentFileset openFileset(final File storageDir, 
final RemoteLogSegmentId id) {
+    public static RemoteLogSegmentFileset openFileset(final File storageDir, 
final RemoteLogSegmentMetadata metadata) {
 
-        final RemoteTopicPartitionDirectory tpDir = 
openTopicPartitionDirectory(id.topicIdPartition(), storageDir);
+        final RemoteTopicPartitionDirectory tpDir = 
openTopicPartitionDirectory(
+                metadata.remoteLogSegmentId().topicIdPartition(), storageDir);
         final File partitionDirectory = tpDir.getDirectory();
-        final Uuid uuid = id.id();
+        final Uuid uuid = metadata.remoteLogSegmentId().id();
+        final String startOffset = 
LogFileUtils.filenamePrefixFromOffset(metadata.startOffset());
 
         final Map<RemoteLogSegmentFileType, File> files = 
stream(RemoteLogSegmentFileType.values())
-                .collect(toMap(identity(), type -> new 
File(partitionDirectory, type.toFilename(uuid))));
+                .collect(toMap(identity(), type -> new 
File(partitionDirectory, type.toFilename(startOffset, uuid))));
 
-        return new RemoteLogSegmentFileset(tpDir, id, files);
+        return new RemoteLogSegmentFileset(tpDir, 
metadata.remoteLogSegmentId(), files);
     }
 
     /**
@@ -183,7 +185,7 @@ public final class RemoteLogSegmentFileset {
         try {
             final Map<RemoteLogSegmentFileType, File> files =
                     Files.list(tpDirectory.getDirectory().toPath())
-                            .filter(path -> 
path.getFileName().toString().startsWith(uuid.toString()))
+                            .filter(path -> 
path.getFileName().toString().contains(uuid.toString()))
                             .collect(toMap(path -> 
getFileType(path.getFileName().toString()), Path::toFile));
 
             final Set<RemoteLogSegmentFileType> expectedFileTypes = 
stream(RemoteLogSegmentFileType.values())

Reply via email to