This is an automated email from the ASF dual-hosted git repository.
divijv pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a3204aed2eb KAFKA-15194: Prepend offset in the filenames used by
LocalTieredStorage (#14057)
a3204aed2eb is described below
commit a3204aed2eb6f2fe7d9f300cca686d3bc2a37c7c
Author: Owen Leung <[email protected]>
AuthorDate: Sat Jul 22 19:47:26 2023 +0800
KAFKA-15194: Prepend offset in the filenames used by LocalTieredStorage
(#14057)
Reviewers: Divij Vaidya <[email protected]>
---
.../kafka/storage/internals/log/LogFileUtils.java | 2 +-
.../log/remote/storage/LocalTieredStorage.java | 34 +++++-----
.../log/remote/storage/LocalTieredStorageTest.java | 77 ++++++++++++----------
.../remote/storage/RemoteLogSegmentFileset.java | 34 +++++-----
4 files changed, 77 insertions(+), 70 deletions(-)
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogFileUtils.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogFileUtils.java
index 2680b2dbe30..2664aeda690 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogFileUtils.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogFileUtils.java
@@ -84,7 +84,7 @@ public final class LogFileUtils {
* @param offset The offset to use in the file name
* @return The filename
*/
- private static String filenamePrefixFromOffset(long offset) {
+ public static String filenamePrefixFromOffset(long offset) {
NumberFormat nf = NumberFormat.getInstance();
nf.setMinimumIntegerDigits(20);
nf.setMaximumFractionDigits(0);
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java
index 3a17fcc24de..058d8a5f649 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java
@@ -81,25 +81,25 @@ import static
org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDir
* The local tiered storage keeps a simple structure of directories mimicking
that of Apache Kafka.
* <p>
* The name of each of the files under the scope of a log segment (the log
file, its indexes, etc.)
- * follows the structure UuidBase64-FileType.
+ * follows the structure startOffset-UuidBase64-FileType.
* <p>
* Given the root directory of the storage, segments and associated files are
organized as represented below.
* </p>
* <code>
- * / storage-directory / topic-0-LWgrMmVrT0a__7a4SasuPA /
bCqX9U--S-6U8XUM9II25Q.log
- * . .
bCqX9U--S-6U8XUM9II25Q.index
- * . .
bCqX9U--S-6U8XUM9II25Q.timeindex
- * . .
h956soEzTzi9a-NOQ-DvKA.log
- * . .
h956soEzTzi9a-NOQ-DvKA.index
- * . .
h956soEzTzi9a-NOQ-DvKA.timeindex
+ * / storage-directory / topic-0-LWgrMmVrT0a__7a4SasuPA /
00000000000000000011-bCqX9U--S-6U8XUM9II25Q.log
+ * . .
00000000000000000011-bCqX9U--S-6U8XUM9II25Q.index
+ * . .
00000000000000000011-bCqX9U--S-6U8XUM9II25Q.timeindex
+ * . .
00000000000000000011-h956soEzTzi9a-NOQ-DvKA.log
+ * . .
00000000000000000011-h956soEzTzi9a-NOQ-DvKA.index
+ * . .
00000000000000000011-h956soEzTzi9a-NOQ-DvKA.timeindex
* .
- * / topic-1-LWgrMmVrT0a__7a4SasuPA / o8CQPT86QQmbFmi3xRmiHA.log
- * . . o8CQPT86QQmbFmi3xRmiHA.index
- * . . o8CQPT86QQmbFmi3xRmiHA.timeindex
+ * / topic-1-LWgrMmVrT0a__7a4SasuPA /
00000000000000000011-o8CQPT86QQmbFmi3xRmiHA.log
+ * . .
00000000000000000011-o8CQPT86QQmbFmi3xRmiHA.index
+ * . .
00000000000000000011-o8CQPT86QQmbFmi3xRmiHA.timeindex
* .
- * / btopic-3-DRagLm_PS9Wl8fz1X43zVg / jvj3vhliTGeU90sIosmp_g.log
- * . . jvj3vhliTGeU90sIosmp_g.index
- * . . jvj3vhliTGeU90sIosmp_g.timeindex
+ * / topic-3-DRagLm_PS9Wl8fz1X43zVg /
00000000000000000011-jvj3vhliTGeU90sIosmp_g.log
+ * . .
00000000000000000011-jvj3vhliTGeU90sIosmp_g.index
+ * . .
00000000000000000011-jvj3vhliTGeU90sIosmp_g.timeindex
* </code>
*/
public final class LocalTieredStorage implements RemoteStorageManager {
@@ -310,7 +310,7 @@ public final class LocalTieredStorage implements
RemoteStorageManager {
RemoteLogSegmentFileset fileset = null;
try {
- fileset = openFileset(storageDirectory, id);
+ fileset = openFileset(storageDirectory, metadata);
logger.info("Offloading log segment for {} from segment={}",
id.topicIdPartition(), data.logSegment());
@@ -359,7 +359,7 @@ public final class LocalTieredStorage implements
RemoteStorageManager {
eventBuilder.withStartPosition(startPos).withEndPosition(endPos);
try {
- final RemoteLogSegmentFileset fileset =
openFileset(storageDirectory, metadata.remoteLogSegmentId());
+ final RemoteLogSegmentFileset fileset =
openFileset(storageDirectory, metadata);
final InputStream inputStream =
newInputStream(fileset.getFile(SEGMENT).toPath(), READ);
inputStream.skip(startPos);
@@ -386,7 +386,7 @@ public final class LocalTieredStorage implements
RemoteStorageManager {
final LocalTieredStorageEvent.Builder eventBuilder =
newEventBuilder(eventType, metadata);
try {
- final RemoteLogSegmentFileset fileset =
openFileset(storageDirectory, metadata.remoteLogSegmentId());
+ final RemoteLogSegmentFileset fileset =
openFileset(storageDirectory, metadata);
File file = fileset.getFile(fileType);
final InputStream inputStream = (fileType.isOptional() &&
!file.exists()) ?
@@ -411,7 +411,7 @@ public final class LocalTieredStorage implements
RemoteStorageManager {
if (deleteEnabled) {
try {
final RemoteLogSegmentFileset fileset = openFileset(
- storageDirectory, metadata.remoteLogSegmentId());
+ storageDirectory, metadata);
if (!fileset.delete()) {
throw new RemoteStorageException("Failed to delete
remote log segment with id:" +
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java
index 7140684dac2..273ce6ce2a0 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java
@@ -123,18 +123,19 @@ public final class LocalTieredStorageTest {
final RemoteLogSegmentMetadata metadata =
newRemoteLogSegmentMetadata(id);
tieredStorage.copyLogSegmentData(metadata, segment);
- remoteStorageVerifier.verifyContainsLogSegmentFiles(id);
+ remoteStorageVerifier.verifyContainsLogSegmentFiles(metadata);
}
@Test
public void copyDataFromLogSegment() throws RemoteStorageException {
final byte[] data = new byte[]{0, 1, 2};
final RemoteLogSegmentId id = newRemoteLogSegmentId();
+ final RemoteLogSegmentMetadata metadata =
newRemoteLogSegmentMetadata(id);
final LogSegmentData segment = localLogSegments.nextSegment(data);
- tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id),
segment);
+ tieredStorage.copyLogSegmentData(metadata, segment);
- remoteStorageVerifier.verifyRemoteLogSegmentMatchesLocal(id, segment);
+ remoteStorageVerifier.verifyRemoteLogSegmentMatchesLocal(metadata,
segment);
}
@Test
@@ -201,41 +202,44 @@ public final class LocalTieredStorageTest {
@Test
public void deleteLogSegment() throws RemoteStorageException {
final RemoteLogSegmentId id = newRemoteLogSegmentId();
+ final RemoteLogSegmentMetadata metadata =
newRemoteLogSegmentMetadata(id);
final LogSegmentData segment = localLogSegments.nextSegment();
tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id),
segment);
- remoteStorageVerifier.verifyContainsLogSegmentFiles(id);
+ remoteStorageVerifier.verifyContainsLogSegmentFiles(metadata);
tieredStorage.deleteLogSegmentData(newRemoteLogSegmentMetadata(id));
- remoteStorageVerifier.verifyLogSegmentFilesAbsent(id);
+ remoteStorageVerifier.verifyLogSegmentFilesAbsent(metadata);
}
@Test
public void deletePartition() throws RemoteStorageException {
int segmentCount = 10;
- List<RemoteLogSegmentId> segmentIds = new ArrayList<>();
+ List<RemoteLogSegmentMetadata> segmentMetadatas = new ArrayList<>();
for (int i = 0; i < segmentCount; i++) {
final RemoteLogSegmentId id = newRemoteLogSegmentId();
+ final RemoteLogSegmentMetadata metadata =
newRemoteLogSegmentMetadata(id);
final LogSegmentData segment = localLogSegments.nextSegment();
- tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id),
segment);
- remoteStorageVerifier.verifyContainsLogSegmentFiles(id);
- segmentIds.add(id);
+ tieredStorage.copyLogSegmentData(metadata, segment);
+ remoteStorageVerifier.verifyContainsLogSegmentFiles(metadata);
+ segmentMetadatas.add(metadata);
}
tieredStorage.deletePartition(topicIdPartition);
remoteStorageVerifier.assertFileDoesNotExist(remoteStorageVerifier.expectedPartitionPath());
- for (RemoteLogSegmentId segmentId: segmentIds) {
- remoteStorageVerifier.verifyLogSegmentFilesAbsent(segmentId);
+ for (RemoteLogSegmentMetadata segmentMetadata: segmentMetadatas) {
+ remoteStorageVerifier.verifyLogSegmentFilesAbsent(segmentMetadata);
}
}
@Test
public void deleteLogSegmentWithoutOptionalFiles() throws
RemoteStorageException {
final RemoteLogSegmentId id = newRemoteLogSegmentId();
+ final RemoteLogSegmentMetadata metadata =
newRemoteLogSegmentMetadata(id);
final LogSegmentData segment = localLogSegments.nextSegment();
segment.transactionIndex().get().toFile().delete();
- tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id),
segment);
- remoteStorageVerifier.verifyContainsLogSegmentFiles(id, path -> {
+ tieredStorage.copyLogSegmentData(metadata, segment);
+ remoteStorageVerifier.verifyContainsLogSegmentFiles(metadata, path -> {
String fileName = path.getFileName().toString();
if (!fileName.contains(LogFileUtils.TXN_INDEX_FILE_SUFFIX)) {
remoteStorageVerifier.assertFileExists(path);
@@ -243,7 +247,7 @@ public final class LocalTieredStorageTest {
});
tieredStorage.deleteLogSegmentData(newRemoteLogSegmentMetadata(id));
- remoteStorageVerifier.verifyLogSegmentFilesAbsent(id);
+ remoteStorageVerifier.verifyLogSegmentFilesAbsent(metadata);
}
@Test
@@ -252,12 +256,12 @@ public final class LocalTieredStorageTest {
final RemoteLogSegmentId id = newRemoteLogSegmentId();
final LogSegmentData segment = localLogSegments.nextSegment();
-
+ final RemoteLogSegmentMetadata metadata =
newRemoteLogSegmentMetadata(id);
tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id),
segment);
- remoteStorageVerifier.verifyContainsLogSegmentFiles(id);
+ remoteStorageVerifier.verifyContainsLogSegmentFiles(metadata);
tieredStorage.deleteLogSegmentData(newRemoteLogSegmentMetadata(id));
- remoteStorageVerifier.verifyContainsLogSegmentFiles(id);
+ remoteStorageVerifier.verifyContainsLogSegmentFiles(metadata);
}
@Test
@@ -399,20 +403,21 @@ public final class LocalTieredStorageTest {
this.topicIdPartition = requireNonNull(topicIdPartition);
}
- private List<Path> expectedPaths(final RemoteLogSegmentId id) {
+ private List<Path> expectedPaths(final RemoteLogSegmentMetadata
metadata) {
final String rootPath = getStorageRootDirectory();
TopicPartition tp = topicIdPartition.topicPartition();
final String topicPartitionSubpath = format("%s-%d-%s",
tp.topic(), tp.partition(),
topicIdPartition.topicId());
- final String uuid = id.id().toString();
+ final String uuid = metadata.remoteLogSegmentId().id().toString();
+ final String startOffset =
LogFileUtils.filenamePrefixFromOffset(metadata.startOffset());
return Arrays.asList(
- Paths.get(rootPath, topicPartitionSubpath, uuid +
LogFileUtils.LOG_FILE_SUFFIX),
- Paths.get(rootPath, topicPartitionSubpath, uuid +
LogFileUtils.INDEX_FILE_SUFFIX),
- Paths.get(rootPath, topicPartitionSubpath, uuid +
LogFileUtils.TIME_INDEX_FILE_SUFFIX),
- Paths.get(rootPath, topicPartitionSubpath, uuid +
LogFileUtils.TXN_INDEX_FILE_SUFFIX),
- Paths.get(rootPath, topicPartitionSubpath, uuid +
LEADER_EPOCH_CHECKPOINT.getSuffix()),
- Paths.get(rootPath, topicPartitionSubpath, uuid +
LogFileUtils.PRODUCER_SNAPSHOT_FILE_SUFFIX)
+ Paths.get(rootPath, topicPartitionSubpath, startOffset +
"-" + uuid + LogFileUtils.LOG_FILE_SUFFIX),
+ Paths.get(rootPath, topicPartitionSubpath, startOffset +
"-" + uuid + LogFileUtils.INDEX_FILE_SUFFIX),
+ Paths.get(rootPath, topicPartitionSubpath, startOffset +
"-" + uuid + LogFileUtils.TIME_INDEX_FILE_SUFFIX),
+ Paths.get(rootPath, topicPartitionSubpath, startOffset +
"-" + uuid + LogFileUtils.TXN_INDEX_FILE_SUFFIX),
+ Paths.get(rootPath, topicPartitionSubpath, startOffset +
"-" + uuid + LEADER_EPOCH_CHECKPOINT.getSuffix()),
+ Paths.get(rootPath, topicPartitionSubpath, startOffset +
"-" + uuid + LogFileUtils.PRODUCER_SNAPSHOT_FILE_SUFFIX)
);
}
@@ -424,37 +429,37 @@ public final class LocalTieredStorageTest {
return Paths.get(rootPath, topicPartitionSubpath);
}
- public void verifyContainsLogSegmentFiles(final RemoteLogSegmentId id,
final Consumer<Path> action) {
- expectedPaths(id).forEach(action);
+ public void verifyContainsLogSegmentFiles(final
RemoteLogSegmentMetadata metadata, final Consumer<Path> action) {
+ expectedPaths(metadata).forEach(action);
}
/**
* Verify the remote storage contains remote log segment and
associated files for the provided {@code id}.
*
- * @param id The unique ID of the remote log segment and associated
resources (e.g. offset and time indexes).
+ * @param metadata The metadata of the remote log segment and
associated resources (e.g. offset and time indexes).
*/
- public void verifyContainsLogSegmentFiles(final RemoteLogSegmentId id)
{
- expectedPaths(id).forEach(this::assertFileExists);
+ public void verifyContainsLogSegmentFiles(final
RemoteLogSegmentMetadata metadata) {
+ expectedPaths(metadata).forEach(this::assertFileExists);
}
/**
* Verify the remote storage does NOT contain remote log segment and
associated files for the provided {@code id}.
*
- * @param id The unique ID of the remote log segment and associated
resources (e.g. offset and time indexes).
+ * @param metadata The metadata of the remote log segment and
associated resources (e.g. offset and time indexes).
*/
- public void verifyLogSegmentFilesAbsent(final RemoteLogSegmentId id) {
- expectedPaths(id).forEach(this::assertFileDoesNotExist);
+ public void verifyLogSegmentFilesAbsent(final RemoteLogSegmentMetadata
metadata) {
+ expectedPaths(metadata).forEach(this::assertFileDoesNotExist);
}
/**
* Compare the content of the remote segment with the provided {@link
LogSegmentData}.
* This method does not fetch from the remote storage.
*
- * @param id The unique ID of the remote log segment and associated
resources (e.g. offset and time indexes).
+ * @param metadata The metadata of the remote log segment and
associated resources (e.g. offset and time indexes).
* @param seg The segment stored on Kafka's local storage.
*/
- public void verifyRemoteLogSegmentMatchesLocal(final
RemoteLogSegmentId id, final LogSegmentData seg) {
- final Path remoteSegmentPath = expectedPaths(id).get(0);
+ public void verifyRemoteLogSegmentMatchesLocal(final
RemoteLogSegmentMetadata metadata, final LogSegmentData seg) {
+ final Path remoteSegmentPath = expectedPaths(metadata).get(0);
assertFileDataEquals(remoteSegmentPath, seg.logSegment());
}
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentFileset.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentFileset.java
index b6eae36ab26..08f1ddbdf1e 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentFileset.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentFileset.java
@@ -59,9 +59,9 @@ import static org.slf4j.LoggerFactory.getLogger;
* the local tiered storage:
*
* <code>
- * / storage-directory / topic-partition-uuidBase64 /
oAtiIQ95REujbuzNd_lkLQ.log
- * .
oAtiIQ95REujbuzNd_lkLQ.index
- * .
oAtiIQ95REujbuzNd_lkLQ.timeindex
+ * / storage-directory / topic-partition-uuidBase64 /
00000000000000000011-oAtiIQ95REujbuzNd_lkLQ.log
+ * .
00000000000000000011-oAtiIQ95REujbuzNd_lkLQ.index
+ * .
00000000000000000011-oAtiIQ95REujbuzNd_lkLQ.timeindex
* </code>
*/
public final class RemoteLogSegmentFileset {
@@ -73,9 +73,9 @@ public final class RemoteLogSegmentFileset {
* The name of each of the files under the scope of a log segment (the log
file, its indexes, etc.)
* follows the structure UUID-FileType.
*/
- private static final Pattern FILENAME_FORMAT =
compile("([a-zA-Z0-9_-]{22})(\\.[a-z_]+)");
- private static final int GROUP_UUID = 1;
- private static final int GROUP_FILE_TYPE = 2;
+ private static final Pattern FILENAME_FORMAT =
compile("(\\d+-)([a-zA-Z0-9_-]{22})(\\.[a-z_]+)");
+ private static final int GROUP_UUID = 2;
+ private static final int GROUP_FILE_TYPE = 3;
/**
* Characterises the type of a file in the local tiered storage copied
from Apache Kafka's standard storage.
@@ -98,10 +98,10 @@ public final class RemoteLogSegmentFileset {
/**
* Provides the name of the file of this type for the given UUID in
the local tiered storage,
- * e.g. uuid.log.
+ * e.g. 0-uuid.log.
*/
- public String toFilename(final Uuid uuid) {
- return uuid.toString() + suffix;
+ public String toFilename(final String startOffset, final Uuid uuid) {
+ return startOffset + "-" + uuid.toString() + suffix;
}
/**
@@ -155,19 +155,21 @@ public final class RemoteLogSegmentFileset {
* the log segment offloaded are not created on the file system until
transfer happens.
*
* @param storageDir The root directory of the local tiered storage.
- * @param id Remote log segment id assigned to a log segment in Kafka.
+ * @param metadata Remote log metadata about a topic partition's remote
log.
* @return A new fileset instance.
*/
- public static RemoteLogSegmentFileset openFileset(final File storageDir,
final RemoteLogSegmentId id) {
+ public static RemoteLogSegmentFileset openFileset(final File storageDir,
final RemoteLogSegmentMetadata metadata) {
- final RemoteTopicPartitionDirectory tpDir =
openTopicPartitionDirectory(id.topicIdPartition(), storageDir);
+ final RemoteTopicPartitionDirectory tpDir =
openTopicPartitionDirectory(
+ metadata.remoteLogSegmentId().topicIdPartition(), storageDir);
final File partitionDirectory = tpDir.getDirectory();
- final Uuid uuid = id.id();
+ final Uuid uuid = metadata.remoteLogSegmentId().id();
+ final String startOffset =
LogFileUtils.filenamePrefixFromOffset(metadata.startOffset());
final Map<RemoteLogSegmentFileType, File> files =
stream(RemoteLogSegmentFileType.values())
- .collect(toMap(identity(), type -> new
File(partitionDirectory, type.toFilename(uuid))));
+ .collect(toMap(identity(), type -> new
File(partitionDirectory, type.toFilename(startOffset, uuid))));
- return new RemoteLogSegmentFileset(tpDir, id, files);
+ return new RemoteLogSegmentFileset(tpDir,
metadata.remoteLogSegmentId(), files);
}
/**
@@ -183,7 +185,7 @@ public final class RemoteLogSegmentFileset {
try {
final Map<RemoteLogSegmentFileType, File> files =
Files.list(tpDirectory.getDirectory().toPath())
- .filter(path ->
path.getFileName().toString().startsWith(uuid.toString()))
+ .filter(path ->
path.getFileName().toString().contains(uuid.toString()))
.collect(toMap(path ->
getFileType(path.getFileName().toString()), Path::toFile));
final Set<RemoteLogSegmentFileType> expectedFileTypes =
stream(RemoteLogSegmentFileType.values())