divijvaidya commented on code in PR #13837:
URL: https://github.com/apache/kafka/pull/13837#discussion_r1247912475


##########
storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteTopicPartitionDirectory.java:
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.internals.Topic;
+import org.slf4j.Logger;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static java.lang.String.format;
+import static java.util.Arrays.asList;
+import static java.util.Objects.requireNonNull;
+import static java.util.regex.Pattern.compile;
+import static java.util.stream.Collectors.toSet;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.getUuid;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.deleteFilesOnly;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.deleteQuietly;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Represents a topic-partition directory in the local tiered storage under 
which filesets for
+ * log segments are stored.
+ *
+ *
+ * <code>
+ * / storage-directory / uuidBase64-0-topic / tvHCaSDsQZWsjr5rbtCjxA-segment
+ *                     .                   .  
tvHCaSDsQZWsjr5rbtCjxA-offset_index
+ *                     .                   .  tvHCaSDsQZWsjr5rbtCjxA-time_index
+ *                     .
+ *                     / 5fEBmixCR5-dMntYSLIr1g-3-topic / 
BFyXlC8ySMm-Uzxw5lZSMg-segment
+ *                                                      . 
BFyXlC8ySMm-Uzxw5lZSMg-offset_index
+ *                                                      . 
BFyXlC8ySMm-Uzxw5lZSMg-time_index
+ * </code>
+ */
+public final class RemoteTopicPartitionDirectory {
+    private static final Logger LOGGER = 
getLogger(RemoteTopicPartitionDirectory.class);
+    private static final String UUID_LEGAL_CHARS = "[a-zA-Z0-9_-]{22}";

Review Comment:
   `_` (underscore) is not a valid UUID char as per 
https://www.ietf.org/rfc/rfc4122.txt



##########
storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java:
##########
@@ -0,0 +1,564 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageListener.LocalTieredStorageListeners;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.test.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.lang.String.format;
+import static java.nio.file.Files.newInputStream;
+import static java.nio.file.StandardOpenOption.READ;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_PARTITION;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_LEADER_EPOCH_CHECKPOINT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_OFFSET_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_PRODUCER_SNAPSHOT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TIME_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TRANSACTION_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.OFFLOAD_SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.LEADER_EPOCH_CHECKPOINT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.OFFSET_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.PRODUCER_SNAPSHOT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TIME_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TRANSACTION_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.openFileset;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openExistingTopicPartitionDirectory;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openTopicPartitionDirectory;
+
+/**
+ * An implementation of {@link RemoteStorageManager} which relies on the local 
file system to store
+ * offloaded log segments and associated data.
+ * <p>
+ * Due to the consistency semantic of POSIX-compliant file systems, this 
remote storage provides strong
+ * read-after-write consistency and a segment's data can be accessed once the 
copy to the storage succeeded.
+ * </p>
+ * <p>
+ * In order to guarantee isolation, independence, reproducibility and 
consistency of unit and integration
+ * tests, the scope of a storage implemented by this class, and identified via 
the storage ID provided to
+ * the constructor, should be limited to a test or well-defined self-contained 
use-case.
+ * </p>
+ * <p>
+ * The local tiered storage keeps a simple structure of directories mimicking 
that of Apache Kafka.
+ * <p>
+ * The name of each of the files under the scope of a log segment (the log 
file, its indexes, etc.)
+ * follows the structure UuidBase64-FileType.
+ * <p>
+ * Given the root directory of the storage, segments and associated files are 
organized as represented below.
+ * </p>
+ * <code>
+ * / storage-directory  / LWgrMmVrT0a__7a4SasuPA-0-topic / 
bCqX9U--S-6U8XUM9II25Q-segment
+ * .                                                     . 
bCqX9U--S-6U8XUM9II25Q-offset_index
+ * .                                                     . 
bCqX9U--S-6U8XUM9II25Q-time_index
+ * .                                                     . 
h956soEzTzi9a-NOQ-DvKA-segment
+ * .                                                     . 
h956soEzTzi9a-NOQ-DvKA-offset_index
+ * .                                                     . 
h956soEzTzi9a-NOQ-DvKA-segment
+ * .
+ * / LWgrMmVrT0a__7a4SasuPA-1-topic / o8CQPT86QQmbFmi3xRmiHA-segment
+ * .                                . o8CQPT86QQmbFmi3xRmiHA-offset_index
+ * .                                . o8CQPT86QQmbFmi3xRmiHA-time_index
+ * .
+ * / DRagLm_PS9Wl8fz1X43zVg-3-btopic / jvj3vhliTGeU90sIosmp_g-segment
+ * .                                 . jvj3vhliTGeU90sIosmp_g-offset_index
+ * .                                 . jvj3vhliTGeU90sIosmp_g-time_index
+ * </code>
+ */
+public final class LocalTieredStorage implements RemoteStorageManager {
+
+    public static final String STORAGE_CONFIG_PREFIX = 
"remote.log.storage.local.";
+
+    /**
+     * The root directory of this storage.
+     */
+    public static final String STORAGE_DIR_PROP = "dir";
+
+    /**
+     * Delete all files and directories from this storage on close, 
substantially removing it
+     * entirely from the file system.
+     */
+    public static final String DELETE_ON_CLOSE_PROP = "delete.on.close";
+
+    /**
+     * The implementation of the transfer of the data of the canonical segment 
and index files to
+     * this storage. The only reason the "transferer" abstraction exists is to 
be able to simulate
+     * file copy errors and exercise the associated failure modes.
+     */
+    public static final String TRANSFERER_CLASS_PROP = "transferer";
+
+    /**
+     * Whether the deleteLogSegment() implemented by this storage should 
actually delete data or behave
+     * as a no-operation. This allows to simulate non-strongly consistent 
storage systems which do not
+     * guarantee visibility of a successful delete for subsequent read or list 
operations.
+     */
+    public static final String ENABLE_DELETE_API_PROP = "delete.enable";
+
+    /**
+     * The ID of the broker which owns this instance of {@link 
LocalTieredStorage}.
+     */
+    public static final String BROKER_ID = "broker.id";

Review Comment:
   Use `KafkaConfig.BrokerIdProp`?



##########
storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java:
##########
@@ -0,0 +1,564 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageListener.LocalTieredStorageListeners;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.test.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.lang.String.format;
+import static java.nio.file.Files.newInputStream;
+import static java.nio.file.StandardOpenOption.READ;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_PARTITION;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_LEADER_EPOCH_CHECKPOINT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_OFFSET_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_PRODUCER_SNAPSHOT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TIME_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TRANSACTION_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.OFFLOAD_SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.LEADER_EPOCH_CHECKPOINT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.OFFSET_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.PRODUCER_SNAPSHOT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TIME_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TRANSACTION_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.openFileset;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openExistingTopicPartitionDirectory;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openTopicPartitionDirectory;
+
+/**
+ * An implementation of {@link RemoteStorageManager} which relies on the local 
file system to store
+ * offloaded log segments and associated data.
+ * <p>
+ * Due to the consistency semantic of POSIX-compliant file systems, this 
remote storage provides strong
+ * read-after-write consistency and a segment's data can be accessed once the 
copy to the storage succeeded.
+ * </p>
+ * <p>
+ * In order to guarantee isolation, independence, reproducibility and 
consistency of unit and integration
+ * tests, the scope of a storage implemented by this class, and identified via 
the storage ID provided to
+ * the constructor, should be limited to a test or well-defined self-contained 
use-case.
+ * </p>
+ * <p>
+ * The local tiered storage keeps a simple structure of directories mimicking 
that of Apache Kafka.
+ * <p>
+ * The name of each of the files under the scope of a log segment (the log 
file, its indexes, etc.)
+ * follows the structure UuidBase64-FileType.
+ * <p>
+ * Given the root directory of the storage, segments and associated files are 
organized as represented below.
+ * </p>
+ * <code>
+ * / storage-directory  / LWgrMmVrT0a__7a4SasuPA-0-topic / 
bCqX9U--S-6U8XUM9II25Q-segment
+ * .                                                     . 
bCqX9U--S-6U8XUM9II25Q-offset_index
+ * .                                                     . 
bCqX9U--S-6U8XUM9II25Q-time_index
+ * .                                                     . 
h956soEzTzi9a-NOQ-DvKA-segment
+ * .                                                     . 
h956soEzTzi9a-NOQ-DvKA-offset_index
+ * .                                                     . 
h956soEzTzi9a-NOQ-DvKA-segment
+ * .
+ * / LWgrMmVrT0a__7a4SasuPA-1-topic / o8CQPT86QQmbFmi3xRmiHA-segment
+ * .                                . o8CQPT86QQmbFmi3xRmiHA-offset_index
+ * .                                . o8CQPT86QQmbFmi3xRmiHA-time_index
+ * .
+ * / DRagLm_PS9Wl8fz1X43zVg-3-btopic / jvj3vhliTGeU90sIosmp_g-segment
+ * .                                 . jvj3vhliTGeU90sIosmp_g-offset_index
+ * .                                 . jvj3vhliTGeU90sIosmp_g-time_index
+ * </code>
+ */
+public final class LocalTieredStorage implements RemoteStorageManager {
+
+    public static final String STORAGE_CONFIG_PREFIX = 
"remote.log.storage.local.";
+
+    /**
+     * The root directory of this storage.
+     */
+    public static final String STORAGE_DIR_PROP = "dir";
+
+    /**
+     * Delete all files and directories from this storage on close, 
substantially removing it
+     * entirely from the file system.
+     */
+    public static final String DELETE_ON_CLOSE_PROP = "delete.on.close";
+
+    /**
+     * The implementation of the transfer of the data of the canonical segment 
and index files to
+     * this storage. The only reason the "transferer" abstraction exists is to 
be able to simulate
+     * file copy errors and exercise the associated failure modes.
+     */
+    public static final String TRANSFERER_CLASS_PROP = "transferer";
+
+    /**
+     * Whether the deleteLogSegment() implemented by this storage should 
actually delete data or behave
+     * as a no-operation. This allows to simulate non-strongly consistent 
storage systems which do not
+     * guarantee visibility of a successful delete for subsequent read or list 
operations.
+     */
+    public static final String ENABLE_DELETE_API_PROP = "delete.enable";
+
+    /**
+     * The ID of the broker which owns this instance of {@link 
LocalTieredStorage}.
+     */
+    public static final String BROKER_ID = "broker.id";
+
+    private static final String ROOT_STORAGES_DIR_NAME = 
"kafka-tiered-storage";
+
+    private volatile File storageDirectory;
+    private volatile boolean deleteOnClose = false;
+    private volatile boolean deleteEnabled = true;
+    private volatile Transferer transferer = new Transferer() {
+        @Override
+        public void transfer(File from, File to) throws IOException {
+            if (from.exists()) {
+                Files.copy(from.toPath(), to.toPath());
+            }
+        }
+
+        @Override
+        public void transfer(ByteBuffer from, File to) throws IOException {
+            if (from != null && from.hasRemaining()) {
+                try (FileOutputStream fileOutputStream = new 
FileOutputStream(to, false);
+                     FileChannel channel = fileOutputStream.getChannel()) {
+                    channel.write(from);
+                }
+            }
+        }
+    };
+
+    private volatile int brokerId = -1;
+
+    private volatile Logger logger = 
LoggerFactory.getLogger(LocalTieredStorage.class);
+
+    /**
+     * Used to explicit a chronological ordering of the events generated by 
the local tiered storage
+     * which this instance gives access to.
+     */
+    // TODO: Makes this timestamp only dependent on the assigned broker, not 
the class instance.
+    private final AtomicInteger eventTimestamp = new AtomicInteger(-1);
+
+    /**
+     * Used to notify users of this storage of internal updates - new 
topic-partition recorded (upon directory
+     * creation) and segment file written (upon segment file write(2)).
+     */
+    private final LocalTieredStorageListeners storageListeners = new 
LocalTieredStorageListeners();
+
+    private final LocalTieredStorageHistory history = new 
LocalTieredStorageHistory();
+
+    public LocalTieredStorage() {
+        history.listenTo(this);
+    }
+
+    /**
+     * Walks through this storage and notify the traverser of every 
topic-partition, segment and record discovered.
+     * <p>
+     * - The order of traversal of the topic-partition is not specified.
+     * - The order of traversal of the segments within a topic-partition is in 
ascending order
+     * of the modified timestamp of the segment file.
+     * - The order of traversal of records within a segment corresponds to the 
insertion
+     * order of these records in the original segment from which the segment 
in this storage
+     * was transferred from.
+     * <p>
+     * This method is NOT an atomic operation w.r.t the local tiered storage. 
This storage may change while
+     * being traversed topic-partitions, segments and records are communicated 
to the traverser. There is
+     * no guarantee updates to the storage which happens during traversal will 
be communicated to the traverser.
+     * Especially, in case of concurrent read and write/delete to a 
topic-partition, a segment or a record,
+     * the behaviour depends on the underlying file system.
+     *
+     * @param traverser User-specified object to which this storage 
communicates the topic-partitions,
+     *                  segments and records as they are discovered.
+     */
+    public void traverse(final LocalTieredStorageTraverser traverser) {
+        Objects.requireNonNull(traverser);
+
+        final File[] files = storageDirectory.listFiles();
+        if (files == null) {
+            // files can be null if the directory is empty.
+            return;
+        }
+
+        Arrays.stream(files)
+                .filter(File::isDirectory)
+                .forEach(dir ->
+                        openExistingTopicPartitionDirectory(dir.getName(), 
storageDirectory).traverse(traverser));
+    }
+
+    public void addListener(final LocalTieredStorageListener listener) {
+        this.storageListeners.add(listener);
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        if (storageDirectory != null) {
+            throw new InvalidConfigurationException(format("This instance of 
local remote storage" +
+                    "is already configured. The existing storage directory is 
%s. Ensure the method " +
+                    "configure() is only called once.", 
storageDirectory.getAbsolutePath()));
+        }
+
+        final String storageDir = (String) configs.get(STORAGE_DIR_PROP);
+        final String shouldDeleteOnClose = (String) 
configs.get(DELETE_ON_CLOSE_PROP);
+        final String transfererClass = (String) 
configs.get(TRANSFERER_CLASS_PROP);
+        final String isDeleteEnabled = (String) 
configs.get(ENABLE_DELETE_API_PROP);
+        final Integer brokerIdInt = (Integer) configs.get(BROKER_ID);
+
+        if (brokerIdInt == null) {
+            throw new InvalidConfigurationException(
+                    "Broker ID is required to configure the LocalTieredStorage 
manager.");
+        }
+
+        brokerId = brokerIdInt;
+        logger = new LogContext(format("[LocalTieredStorage Id=%d] ", 
brokerId)).logger(this.getClass());
+
+        if (shouldDeleteOnClose != null) {
+            deleteOnClose = Boolean.parseBoolean(shouldDeleteOnClose);
+        }
+
+        if (isDeleteEnabled != null) {
+            deleteEnabled = Boolean.parseBoolean(isDeleteEnabled);
+        }
+
+        if (transfererClass != null) {
+            try {
+                transferer = (Transferer) 
getClass().getClassLoader().loadClass(transfererClass).newInstance();
+
+            } catch (InstantiationException | IllegalAccessException | 
ClassNotFoundException | ClassCastException e) {
+                throw new RuntimeException(format("Cannot create transferer 
from class '%s'", transfererClass), e);
+            }
+        }
+
+
+        if (storageDir == null) {
+            storageDirectory = TestUtils.tempDirectory(ROOT_STORAGES_DIR_NAME 
+ "-");

Review Comment:
   This will be deleted on exit because TestUtils.tempDirectory registers 
`file.deleteOnExit()`. Is this the behaviour we want when we pass 
deleteOnClose=false?



##########
storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java:
##########
@@ -0,0 +1,564 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageListener.LocalTieredStorageListeners;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.test.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.lang.String.format;
+import static java.nio.file.Files.newInputStream;
+import static java.nio.file.StandardOpenOption.READ;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_PARTITION;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_LEADER_EPOCH_CHECKPOINT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_OFFSET_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_PRODUCER_SNAPSHOT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TIME_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TRANSACTION_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.OFFLOAD_SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.LEADER_EPOCH_CHECKPOINT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.OFFSET_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.PRODUCER_SNAPSHOT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TIME_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TRANSACTION_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.openFileset;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openExistingTopicPartitionDirectory;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openTopicPartitionDirectory;
+
+/**
+ * An implementation of {@link RemoteStorageManager} which relies on the local 
file system to store
+ * offloaded log segments and associated data.
+ * <p>
+ * Due to the consistency semantic of POSIX-compliant file systems, this 
remote storage provides strong
+ * read-after-write consistency and a segment's data can be accessed once the 
copy to the storage succeeded.
+ * </p>
+ * <p>
+ * In order to guarantee isolation, independence, reproducibility and 
consistency of unit and integration
+ * tests, the scope of a storage implemented by this class, and identified via 
the storage ID provided to
+ * the constructor, should be limited to a test or well-defined self-contained 
use-case.
+ * </p>
+ * <p>
+ * The local tiered storage keeps a simple structure of directories mimicking 
that of Apache Kafka.
+ * <p>
+ * The name of each of the files under the scope of a log segment (the log 
file, its indexes, etc.)
+ * follows the structure UuidBase64-FileType.
+ * <p>
+ * Given the root directory of the storage, segments and associated files are 
organized as represented below.
+ * </p>
+ * <code>
+ * / storage-directory  / LWgrMmVrT0a__7a4SasuPA-0-topic / 
bCqX9U--S-6U8XUM9II25Q-segment
+ * .                                                     . 
bCqX9U--S-6U8XUM9II25Q-offset_index
+ * .                                                     . 
bCqX9U--S-6U8XUM9II25Q-time_index
+ * .                                                     . 
h956soEzTzi9a-NOQ-DvKA-segment
+ * .                                                     . 
h956soEzTzi9a-NOQ-DvKA-offset_index
+ * .                                                     . 
h956soEzTzi9a-NOQ-DvKA-segment
+ * .
+ * / LWgrMmVrT0a__7a4SasuPA-1-topic / o8CQPT86QQmbFmi3xRmiHA-segment
+ * .                                . o8CQPT86QQmbFmi3xRmiHA-offset_index
+ * .                                . o8CQPT86QQmbFmi3xRmiHA-time_index
+ * .
+ * / DRagLm_PS9Wl8fz1X43zVg-3-btopic / jvj3vhliTGeU90sIosmp_g-segment
+ * .                                 . jvj3vhliTGeU90sIosmp_g-offset_index
+ * .                                 . jvj3vhliTGeU90sIosmp_g-time_index
+ * </code>
+ */
+public final class LocalTieredStorage implements RemoteStorageManager {
+
+    public static final String STORAGE_CONFIG_PREFIX = 
"remote.log.storage.local.";
+
+    /**
+     * The root directory of this storage.
+     */
+    public static final String STORAGE_DIR_PROP = "dir";
+
+    /**
+     * Delete all files and directories from this storage on close, 
substantially removing it
+     * entirely from the file system.
+     */
+    public static final String DELETE_ON_CLOSE_PROP = "delete.on.close";
+
+    /**
+     * The implementation of the transfer of the data of the canonical segment 
and index files to
+     * this storage. The only reason the "transferer" abstraction exists is to 
be able to simulate
+     * file copy errors and exercise the associated failure modes.
+     */
+    public static final String TRANSFERER_CLASS_PROP = "transferer";
+
+    /**
+     * Whether the deleteLogSegment() implemented by this storage should 
actually delete data or behave
+     * as a no-operation. This allows to simulate non-strongly consistent 
storage systems which do not
+     * guarantee visibility of a successful delete for subsequent read or list 
operations.
+     */
+    public static final String ENABLE_DELETE_API_PROP = "delete.enable";
+
+    /**
+     * The ID of the broker which owns this instance of {@link 
LocalTieredStorage}.
+     */
+    public static final String BROKER_ID = "broker.id";
+
+    private static final String ROOT_STORAGES_DIR_NAME = 
"kafka-tiered-storage";
+
+    private volatile File storageDirectory;
+    private volatile boolean deleteOnClose = false;
+    private volatile boolean deleteEnabled = true;
+    private volatile Transferer transferer = new Transferer() {
+        @Override
+        public void transfer(File from, File to) throws IOException {
+            if (from.exists()) {
+                Files.copy(from.toPath(), to.toPath());
+            }
+        }
+
+        @Override
+        public void transfer(ByteBuffer from, File to) throws IOException {
+            if (from != null && from.hasRemaining()) {
+                try (FileOutputStream fileOutputStream = new 
FileOutputStream(to, false);
+                     FileChannel channel = fileOutputStream.getChannel()) {
+                    channel.write(from);
+                }
+            }
+        }
+    };
+
+    private volatile int brokerId = -1;
+
+    private volatile Logger logger = 
LoggerFactory.getLogger(LocalTieredStorage.class);
+
+    /**
+     * Used to explicit a chronological ordering of the events generated by 
the local tiered storage
+     * which this instance gives access to.
+     */
+    // TODO: Makes this timestamp only dependent on the assigned broker, not 
the class instance.
+    private final AtomicInteger eventTimestamp = new AtomicInteger(-1);
+
+    /**
+     * Used to notify users of this storage of internal updates - new 
topic-partition recorded (upon directory
+     * creation) and segment file written (upon segment file write(2)).
+     */
+    private final LocalTieredStorageListeners storageListeners = new 
LocalTieredStorageListeners();
+
+    private final LocalTieredStorageHistory history = new 
LocalTieredStorageHistory();
+
+    public LocalTieredStorage() {
+        history.listenTo(this);
+    }
+
+    /**
+     * Walks through this storage and notify the traverser of every 
topic-partition, segment and record discovered.
+     * <p>
+     * - The order of traversal of the topic-partition is not specified.
+     * - The order of traversal of the segments within a topic-partition is in 
ascending order
+     * of the modified timestamp of the segment file.
+     * - The order of traversal of records within a segment corresponds to the 
insertion
+     * order of these records in the original segment from which the segment 
in this storage
+     * was transferred from.
+     * <p>
+     * This method is NOT an atomic operation w.r.t the local tiered storage. 
This storage may change while
+     * being traversed topic-partitions, segments and records are communicated 
to the traverser. There is
+     * no guarantee updates to the storage which happens during traversal will 
be communicated to the traverser.
+     * Especially, in case of concurrent read and write/delete to a 
topic-partition, a segment or a record,
+     * the behaviour depends on the underlying file system.
+     *
+     * @param traverser User-specified object to which this storage 
communicates the topic-partitions,
+     *                  segments and records as they are discovered.
+     */
+    public void traverse(final LocalTieredStorageTraverser traverser) {
+        Objects.requireNonNull(traverser);
+
+        final File[] files = storageDirectory.listFiles();
+        if (files == null) {
+            // files can be null if the directory is empty.
+            return;
+        }
+
+        Arrays.stream(files)
+                .filter(File::isDirectory)
+                .forEach(dir ->
+                        openExistingTopicPartitionDirectory(dir.getName(), 
storageDirectory).traverse(traverser));
+    }
+
+    public void addListener(final LocalTieredStorageListener listener) {
+        this.storageListeners.add(listener);
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        if (storageDirectory != null) {
+            throw new InvalidConfigurationException(format("This instance of 
local remote storage" +
+                    "is already configured. The existing storage directory is 
%s. Ensure the method " +
+                    "configure() is only called once.", 
storageDirectory.getAbsolutePath()));
+        }
+
+        final String storageDir = (String) configs.get(STORAGE_DIR_PROP);
+        final String shouldDeleteOnClose = (String) 
configs.get(DELETE_ON_CLOSE_PROP);
+        final String transfererClass = (String) 
configs.get(TRANSFERER_CLASS_PROP);
+        final String isDeleteEnabled = (String) 
configs.get(ENABLE_DELETE_API_PROP);
+        final Integer brokerIdInt = (Integer) configs.get(BROKER_ID);
+
+        if (brokerIdInt == null) {
+            throw new InvalidConfigurationException(
+                    "Broker ID is required to configure the LocalTieredStorage 
manager.");
+        }
+
+        brokerId = brokerIdInt;
+        logger = new LogContext(format("[LocalTieredStorage Id=%d] ", 
brokerId)).logger(this.getClass());
+
+        if (shouldDeleteOnClose != null) {
+            deleteOnClose = Boolean.parseBoolean(shouldDeleteOnClose);
+        }
+
+        if (isDeleteEnabled != null) {
+            deleteEnabled = Boolean.parseBoolean(isDeleteEnabled);
+        }
+
+        if (transfererClass != null) {
+            try {
+                transferer = (Transferer) 
getClass().getClassLoader().loadClass(transfererClass).newInstance();
+
+            } catch (InstantiationException | IllegalAccessException | 
ClassNotFoundException | ClassCastException e) {
+                throw new RuntimeException(format("Cannot create transferer 
from class '%s'", transfererClass), e);
+            }
+        }
+
+
+        if (storageDir == null) {
+            storageDirectory = TestUtils.tempDirectory(ROOT_STORAGES_DIR_NAME 
+ "-");
+
+            logger.debug("No storage directory specified, created temporary 
directory: {}",
+                    storageDirectory.getAbsolutePath());
+
+        } else {
+            storageDirectory = new File(storageDir + "/" + 
ROOT_STORAGES_DIR_NAME);
+            // NOTE: Provide the relative storage directory path to view the 
files in the same directory when running tests.
+            // storageDirectory = new File(new File("."), 
ROOT_STORAGES_DIR_NAME + "/" + storageDir);
+            final boolean existed = storageDirectory.exists();
+
+            if (!existed) {
+                logger.info("Creating directory: [{}]", 
storageDirectory.getAbsolutePath());
+                storageDirectory.mkdirs();

Review Comment:
   please use `Files.createDirectories`. The new (relatively) Files API 
introduced in JDK 7 has better error handling semantics.



##########
storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentFileset.java:
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.Record;
+import org.slf4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static java.lang.String.format;
+import static java.util.Arrays.stream;
+import static java.util.Collections.unmodifiableMap;
+import static java.util.Objects.requireNonNull;
+import static java.util.function.Function.identity;
+import static java.util.regex.Pattern.compile;
+import static java.util.stream.Collectors.toMap;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.LEADER_EPOCH_CHECKPOINT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.OFFSET_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.PRODUCER_SNAPSHOT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TIME_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TRANSACTION_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.getFileType;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openTopicPartitionDirectory;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Represents the set of files offloaded to the local tiered storage for a 
single log segment.
+ * A {@link RemoteLogSegmentFileset} corresponds to the leaves of the file 
system structure of
+ * the local tiered storage:
+ *
+ * <code>
+ * / storage-directory / uuidBase64-partition-topic / 
oAtiIQ95REujbuzNd_lkLQ-segment
+ *                                                  . 
oAtiIQ95REujbuzNd_lkLQ-offset_index
+ *                                                  . 
oAtiIQ95REujbuzNd_lkLQ-time_index
+ * </code>
+ */
+public final class RemoteLogSegmentFileset {
+
+    /**
+     * The format of a file which belongs to the fileset, i.e. a file which is 
assigned to a log segment in
+     * Kafka's log directory.
+     *
+     * The name of each of the files under the scope of a log segment (the log 
file, its indexes, etc.)
+     * follows the structure UUID-FileType.
+     */
+    private static final String UUID_LEGAL_CHARS = "[a-zA-Z0-9_-]{22}";
+    private static final Pattern FILENAME_FORMAT = compile("(" + 
UUID_LEGAL_CHARS + ")-([a-z_]+)");
+    private static final int GROUP_UUID = 1;
+    private static final int GROUP_FILE_TYPE = 2;
+
+    /**
+     * Characterises the type of a file in the local tiered storage copied 
from Apache Kafka's standard storage.
+     */
+    public enum RemoteLogSegmentFileType {
+        SEGMENT(false),
+        OFFSET_INDEX(false),
+        TIME_INDEX(false),
+        TRANSACTION_INDEX(true),
+        LEADER_EPOCH_CHECKPOINT(false),
+        PRODUCER_SNAPSHOT(true);
+
+        private final boolean optional;
+
+        RemoteLogSegmentFileType(boolean optional) {
+            this.optional = optional;
+        }
+
+        /**
+         * Provides the name of the file of this type for the given UUID in 
the local tiered storage,
+         * e.g. uuid-segment.
+         */
+        public String toFilename(final Uuid uuid) {
+            return format("%s-%s", uuid.toString(), 
name().toLowerCase(Locale.ROOT));
+        }
+
+        /**
+         * Returns the nature of the data stored in the file with the provided 
name.
+         */
+        public static RemoteLogSegmentFileType getFileType(final String 
filename) {
+            try {
+                return RemoteLogSegmentFileType.valueOf(substr(filename, 
GROUP_FILE_TYPE).toUpperCase(Locale.ROOT));
+
+            } catch (final RuntimeException e) {
+                throw new IllegalArgumentException(format("Not a remote log 
segment file: %s", filename), e);
+            }
+        }
+
+        /**
+         * Extract the UUID from the filename. This UUID is that of the remote 
log segment id which uniquely
+         * identify the log segment which filename's data belongs to (not 
necessarily segment data, but also
+         * indexes or other associated files).
+         */
+        public static Uuid getUuid(final String filename) {
+            return Uuid.fromString(substr(filename, GROUP_UUID));
+        }
+
+        static String substr(final String filename, final int group) {
+            final Matcher m = FILENAME_FORMAT.matcher(filename);
+            if (!m.matches()) {
+                throw new IllegalArgumentException(format("Not a remote log 
segment file: %s", filename));
+            }
+            return m.group(group);
+        }
+
+        public boolean isOptional() {
+            return optional;
+        }
+    }
+
+    private static final Logger LOGGER = 
getLogger(RemoteLogSegmentFileset.class);
+
+    private final RemoteTopicPartitionDirectory partitionDirectory;
+    private final RemoteLogSegmentId remoteLogSegmentId;
+    private final Map<RemoteLogSegmentFileType, File> files;
+
+    /**
+     * Creates a new fileset located under the given storage directory for the 
provided remote log segment id.
+     * The topic-partition directory is created if it does not exist yet. 
However the files corresponding to
+     * the log segment offloaded are not created on the file system until 
transfer happens.
+     *
+     * @param storageDir The root directory of the local tiered storage.
+     * @param id Remote log segment id assigned to a log segment in Kafka.
+     * @return A new fileset instance.
+     */
+    public static RemoteLogSegmentFileset openFileset(final File storageDir, 
final RemoteLogSegmentId id) {
+
+        final RemoteTopicPartitionDirectory tpDir = 
openTopicPartitionDirectory(id.topicIdPartition(), storageDir);
+        final File partitionDirectory = tpDir.getDirectory();
+        final Uuid uuid = id.id();
+
+        final Map<RemoteLogSegmentFileType, File> files = 
stream(RemoteLogSegmentFileType.values())
+                .collect(toMap(identity(), type -> new 
File(partitionDirectory, type.toFilename(uuid))));
+
+        return new RemoteLogSegmentFileset(tpDir, id, files);
+    }
+
+    /**
+     * Creates a fileset instance for the physical set of files located under 
the given topic-partition directory.
+     * The fileset MUST exist on the file system with the given uuid.
+     *
+     * @param tpDirectory The topic-partition directory which this fileset's 
segment belongs to.
+     * @param uuid The expected UUID of the fileset.
+     * @return A new fileset instance.
+     */
+    public static RemoteLogSegmentFileset openExistingFileset(final 
RemoteTopicPartitionDirectory tpDirectory,
+                                                              final Uuid uuid) 
{
+        final Map<RemoteLogSegmentFileType, File> files =
+                stream(tpDirectory.getDirectory().listFiles())

Review Comment:
   Please use newer API `Files.list()`



##########
storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java:
##########
@@ -0,0 +1,564 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageListener.LocalTieredStorageListeners;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.test.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.lang.String.format;
+import static java.nio.file.Files.newInputStream;
+import static java.nio.file.StandardOpenOption.READ;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_PARTITION;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_LEADER_EPOCH_CHECKPOINT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_OFFSET_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_PRODUCER_SNAPSHOT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TIME_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TRANSACTION_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.OFFLOAD_SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.LEADER_EPOCH_CHECKPOINT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.OFFSET_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.PRODUCER_SNAPSHOT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TIME_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TRANSACTION_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.openFileset;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openExistingTopicPartitionDirectory;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openTopicPartitionDirectory;
+
+/**
+ * An implementation of {@link RemoteStorageManager} which relies on the local 
file system to store
+ * offloaded log segments and associated data.
+ * <p>
+ * Due to the consistency semantic of POSIX-compliant file systems, this 
remote storage provides strong
+ * read-after-write consistency and a segment's data can be accessed once the 
copy to the storage succeeded.
+ * </p>
+ * <p>
+ * In order to guarantee isolation, independence, reproducibility and 
consistency of unit and integration
+ * tests, the scope of a storage implemented by this class, and identified via 
the storage ID provided to
+ * the constructor, should be limited to a test or well-defined self-contained 
use-case.
+ * </p>
+ * <p>
+ * The local tiered storage keeps a simple structure of directories mimicking 
that of Apache Kafka.
+ * <p>
+ * The name of each of the files under the scope of a log segment (the log 
file, its indexes, etc.)
+ * follows the structure UuidBase64-FileType.
+ * <p>
+ * Given the root directory of the storage, segments and associated files are 
organized as represented below.
+ * </p>
+ * <code>
+ * / storage-directory  / LWgrMmVrT0a__7a4SasuPA-0-topic / 
bCqX9U--S-6U8XUM9II25Q-segment
+ * .                                                     . 
bCqX9U--S-6U8XUM9II25Q-offset_index
+ * .                                                     . 
bCqX9U--S-6U8XUM9II25Q-time_index
+ * .                                                     . 
h956soEzTzi9a-NOQ-DvKA-segment
+ * .                                                     . 
h956soEzTzi9a-NOQ-DvKA-offset_index
+ * .                                                     . 
h956soEzTzi9a-NOQ-DvKA-segment
+ * .
+ * / LWgrMmVrT0a__7a4SasuPA-1-topic / o8CQPT86QQmbFmi3xRmiHA-segment
+ * .                                . o8CQPT86QQmbFmi3xRmiHA-offset_index
+ * .                                . o8CQPT86QQmbFmi3xRmiHA-time_index
+ * .
+ * / DRagLm_PS9Wl8fz1X43zVg-3-btopic / jvj3vhliTGeU90sIosmp_g-segment
+ * .                                 . jvj3vhliTGeU90sIosmp_g-offset_index
+ * .                                 . jvj3vhliTGeU90sIosmp_g-time_index
+ * </code>
+ */
+public final class LocalTieredStorage implements RemoteStorageManager {
+
+    public static final String STORAGE_CONFIG_PREFIX = 
"remote.log.storage.local.";
+
+    /**
+     * The root directory of this storage.
+     */
+    public static final String STORAGE_DIR_PROP = "dir";
+
+    /**
+     * Delete all files and directories from this storage on close, 
substantially removing it
+     * entirely from the file system.
+     */
+    public static final String DELETE_ON_CLOSE_PROP = "delete.on.close";
+
+    /**
+     * The implementation of the transfer of the data of the canonical segment 
and index files to
+     * this storage. The only reason the "transferer" abstraction exists is to 
be able to simulate
+     * file copy errors and exercise the associated failure modes.
+     */
+    public static final String TRANSFERER_CLASS_PROP = "transferer";
+
+    /**
+     * Whether the deleteLogSegment() implemented by this storage should 
actually delete data or behave
+     * as a no-operation. This allows to simulate non-strongly consistent 
storage systems which do not
+     * guarantee visibility of a successful delete for subsequent read or list 
operations.
+     */
+    public static final String ENABLE_DELETE_API_PROP = "delete.enable";
+
+    /**
+     * The ID of the broker which owns this instance of {@link 
LocalTieredStorage}.
+     */
+    public static final String BROKER_ID = "broker.id";
+
+    private static final String ROOT_STORAGES_DIR_NAME = 
"kafka-tiered-storage";
+
+    private volatile File storageDirectory;
+    private volatile boolean deleteOnClose = false;
+    private volatile boolean deleteEnabled = true;
+    private volatile Transferer transferer = new Transferer() {
+        @Override
+        public void transfer(File from, File to) throws IOException {
+            if (from.exists()) {
+                Files.copy(from.toPath(), to.toPath());
+            }
+        }
+
+        @Override
+        public void transfer(ByteBuffer from, File to) throws IOException {
+            if (from != null && from.hasRemaining()) {
+                try (FileOutputStream fileOutputStream = new 
FileOutputStream(to, false);
+                     FileChannel channel = fileOutputStream.getChannel()) {
+                    channel.write(from);
+                }
+            }
+        }
+    };
+
+    private volatile int brokerId = -1;
+
+    private volatile Logger logger = 
LoggerFactory.getLogger(LocalTieredStorage.class);
+
+    /**
+     * Used to explicit a chronological ordering of the events generated by 
the local tiered storage
+     * which this instance gives access to.
+     */
+    // TODO: Makes this timestamp only dependent on the assigned broker, not 
the class instance.
+    private final AtomicInteger eventTimestamp = new AtomicInteger(-1);
+
+    /**
+     * Used to notify users of this storage of internal updates - new 
topic-partition recorded (upon directory
+     * creation) and segment file written (upon segment file write(2)).
+     */
+    private final LocalTieredStorageListeners storageListeners = new 
LocalTieredStorageListeners();
+
+    private final LocalTieredStorageHistory history = new 
LocalTieredStorageHistory();
+
+    public LocalTieredStorage() {
+        history.listenTo(this);
+    }
+
+    /**
+     * Walks through this storage and notify the traverser of every 
topic-partition, segment and record discovered.
+     * <p>
+     * - The order of traversal of the topic-partition is not specified.
+     * - The order of traversal of the segments within a topic-partition is in 
ascending order
+     * of the modified timestamp of the segment file.
+     * - The order of traversal of records within a segment corresponds to the 
insertion
+     * order of these records in the original segment from which the segment 
in this storage
+     * was transferred from.
+     * <p>
+     * This method is NOT an atomic operation w.r.t the local tiered storage. 
This storage may change while
+     * being traversed topic-partitions, segments and records are communicated 
to the traverser. There is
+     * no guarantee updates to the storage which happens during traversal will 
be communicated to the traverser.
+     * Especially, in case of concurrent read and write/delete to a 
topic-partition, a segment or a record,
+     * the behaviour depends on the underlying file system.
+     *
+     * @param traverser User-specified object to which this storage 
communicates the topic-partitions,
+     *                  segments and records as they are discovered.
+     */
+    public void traverse(final LocalTieredStorageTraverser traverser) {
+        Objects.requireNonNull(traverser);
+
+        final File[] files = storageDirectory.listFiles();
+        if (files == null) {
+            // files can be null if the directory is empty.
+            return;
+        }
+
+        Arrays.stream(files)
+                .filter(File::isDirectory)
+                .forEach(dir ->
+                        openExistingTopicPartitionDirectory(dir.getName(), 
storageDirectory).traverse(traverser));
+    }
+
+    public void addListener(final LocalTieredStorageListener listener) {
+        this.storageListeners.add(listener);
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        if (storageDirectory != null) {
+            throw new InvalidConfigurationException(format("This instance of 
local remote storage" +
+                    "is already configured. The existing storage directory is 
%s. Ensure the method " +
+                    "configure() is only called once.", 
storageDirectory.getAbsolutePath()));
+        }
+
+        final String storageDir = (String) configs.get(STORAGE_DIR_PROP);
+        final String shouldDeleteOnClose = (String) 
configs.get(DELETE_ON_CLOSE_PROP);
+        final String transfererClass = (String) 
configs.get(TRANSFERER_CLASS_PROP);
+        final String isDeleteEnabled = (String) 
configs.get(ENABLE_DELETE_API_PROP);
+        final Integer brokerIdInt = (Integer) configs.get(BROKER_ID);
+
+        if (brokerIdInt == null) {
+            throw new InvalidConfigurationException(
+                    "Broker ID is required to configure the LocalTieredStorage 
manager.");
+        }
+
+        brokerId = brokerIdInt;
+        logger = new LogContext(format("[LocalTieredStorage Id=%d] ", 
brokerId)).logger(this.getClass());
+
+        if (shouldDeleteOnClose != null) {
+            deleteOnClose = Boolean.parseBoolean(shouldDeleteOnClose);
+        }
+
+        if (isDeleteEnabled != null) {
+            deleteEnabled = Boolean.parseBoolean(isDeleteEnabled);
+        }
+
+        if (transfererClass != null) {
+            try {
+                transferer = (Transferer) 
getClass().getClassLoader().loadClass(transfererClass).newInstance();
+
+            } catch (InstantiationException | IllegalAccessException | 
ClassNotFoundException | ClassCastException e) {
+                throw new RuntimeException(format("Cannot create transferer 
from class '%s'", transfererClass), e);
+            }
+        }
+
+
+        if (storageDir == null) {
+            storageDirectory = TestUtils.tempDirectory(ROOT_STORAGES_DIR_NAME 
+ "-");
+
+            logger.debug("No storage directory specified, created temporary 
directory: {}",
+                    storageDirectory.getAbsolutePath());
+
+        } else {
+            storageDirectory = new File(storageDir + "/" + 
ROOT_STORAGES_DIR_NAME);
+            // NOTE: Provide the relative storage directory path to view the 
files in the same directory when running tests.
+            // storageDirectory = new File(new File("."), 
ROOT_STORAGES_DIR_NAME + "/" + storageDir);
+            final boolean existed = storageDirectory.exists();

Review Comment:
   please use `Files.exists()` which was introduced in JDK 7 in NIO2 APIs. It 
has better error handling semantics.



##########
storage/src/test/java/org/apache/kafka/server/log/remote/storage/Transferer.java:
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public interface Transferer {

Review Comment:
   a java doc on the purpose of this interface will be very useful here



##########
storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java:
##########
@@ -0,0 +1,675 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.TimestampType;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.text.NumberFormat;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import static java.lang.String.format;
+import static java.nio.ByteBuffer.wrap;
+import static java.util.Arrays.asList;
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageSnapshot.takeSnapshot;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.SEGMENT;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import static 
org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType.LEADER_EPOCH;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType.OFFSET;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType.TIMESTAMP;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType.TRANSACTION;
+
+public final class LocalTieredStorageTest {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(LocalTieredStorageTest.class);
+
+    private final LocalLogSegments localLogSegments = new LocalLogSegments();
+    private final TopicPartition topicPartition = new 
TopicPartition("my-topic", 1);
+    private final TopicIdPartition topicIdPartition = new 
TopicIdPartition(Uuid.randomUuid(), topicPartition);
+
+    private LocalTieredStorage tieredStorage;
+    private Verifier remoteStorageVerifier;
+    private String storageDir;
+
+    private void init(Map<String, Object> extraConfig, String testName) {
+        tieredStorage = new LocalTieredStorage();
+        remoteStorageVerifier = new Verifier(tieredStorage, topicIdPartition);
+        storageDir = generateStorageId(testName);
+
+        Map<String, Object> config = new HashMap<>();
+        config.put(LocalTieredStorage.STORAGE_DIR_PROP, storageDir);
+        config.put(LocalTieredStorage.DELETE_ON_CLOSE_PROP, "true");
+        config.put(LocalTieredStorage.BROKER_ID, 1);
+        config.putAll(extraConfig);
+
+        tieredStorage.configure(config);
+    }
+
+    @BeforeEach
+    public void before(TestInfo testInfo) {
+        init(Collections.emptyMap(), testInfo.getDisplayName());
+    }
+
+    @AfterEach
+    public void after() throws IOException {
+        tieredStorage.clear();
+        localLogSegments.deleteAll();
+        Files.deleteIfExists(Paths.get(storageDir));
+    }
+
+    @Test
+    public void copyEmptyLogSegment() throws RemoteStorageException {
+        final RemoteLogSegmentId id = newRemoteLogSegmentId();
+        final LogSegmentData segment = localLogSegments.nextSegment();
+        final RemoteLogSegmentMetadata metadata = 
newRemoteLogSegmentMetadata(id);
+        tieredStorage.copyLogSegmentData(metadata, segment);
+
+        remoteStorageVerifier.verifyContainsLogSegmentFiles(id, segment);
+    }
+
+    @Test
+    public void copyDataFromLogSegment() throws RemoteStorageException {
+        final byte[] data = new byte[]{0, 1, 2};
+        final RemoteLogSegmentId id = newRemoteLogSegmentId();
+        final LogSegmentData segment = localLogSegments.nextSegment(data);
+
+        tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id), 
segment);
+
+        remoteStorageVerifier.verifyRemoteLogSegmentMatchesLocal(id, segment);
+    }
+
+    @Test
+    public void fetchLogSegment() throws RemoteStorageException {
+        final RemoteLogSegmentId id = newRemoteLogSegmentId();
+        final LogSegmentData segment = localLogSegments.nextSegment(new 
byte[]{0, 1, 2});
+
+        tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id), 
segment);
+
+        remoteStorageVerifier.verifyFetchedLogSegment(id, 0, new byte[]{0, 1, 
2});
+        //FIXME: Fetch at arbitrary index does not work as proper support for 
records need to be added.
+    }
+
+    @Test
+    public void fetchOffsetIndex() throws RemoteStorageException {
+        final RemoteLogSegmentId id = newRemoteLogSegmentId();
+        final LogSegmentData segment = localLogSegments.nextSegment();
+
+        tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id), 
segment);
+
+        remoteStorageVerifier.verifyFetchedOffsetIndex(id, 
LocalLogSegments.OFFSET_FILE_BYTES);
+    }
+
+    @Test
+    public void fetchTimeIndex() throws RemoteStorageException {
+        final RemoteLogSegmentId id = newRemoteLogSegmentId();
+        final LogSegmentData segment = localLogSegments.nextSegment();
+
+        tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id), 
segment);
+
+        remoteStorageVerifier.verifyFetchedTimeIndex(id, 
LocalLogSegments.TIME_FILE_BYTES);
+    }
+
+    @Test
+    public void fetchTransactionIndex() throws RemoteStorageException {
+        final RemoteLogSegmentId id = newRemoteLogSegmentId();
+        final LogSegmentData segment = localLogSegments.nextSegment();
+
+        tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id), 
segment);
+
+        remoteStorageVerifier.verifyFetchedTransactionIndex(id, 
LocalLogSegments.TXN_FILE_BYTES);
+    }
+
+    @Test
+    public void fetchLeaderEpochCheckpoint() throws RemoteStorageException {
+        final RemoteLogSegmentId id = newRemoteLogSegmentId();
+        final LogSegmentData segment = localLogSegments.nextSegment();
+
+        tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id), 
segment);
+
+        remoteStorageVerifier.verifyLeaderEpochCheckpoint(id, 
LocalLogSegments.LEADER_EPOCH_CHECKPOINT_FILE_BYTES);
+    }
+
+    @Test
+    public void fetchProducerSnapshot() throws RemoteStorageException {
+        final RemoteLogSegmentId id = newRemoteLogSegmentId();
+        final LogSegmentData segment = localLogSegments.nextSegment();
+
+        tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id), 
segment);
+
+        remoteStorageVerifier.verifyProducerSnapshot(id, 
LocalLogSegments.PRODUCER_SNAPSHOT_FILE_BYTES);
+    }
+
+    @Test
+    public void deleteLogSegment() throws RemoteStorageException {
+        final RemoteLogSegmentId id = newRemoteLogSegmentId();
+        final LogSegmentData segment = localLogSegments.nextSegment();
+
+        tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id), 
segment);
+        remoteStorageVerifier.verifyContainsLogSegmentFiles(id, segment);
+
+        tieredStorage.deleteLogSegmentData(newRemoteLogSegmentMetadata(id));
+        remoteStorageVerifier.verifyLogSegmentFilesAbsent(id, segment);
+    }
+
+    @Test
+    public void deletePartition() throws RemoteStorageException {
+        int segmentCount = 10;
+        List<RemoteLogSegmentId> segmentIds = new ArrayList<>();
+        for (int i = 0; i < segmentCount; i++) {
+            final RemoteLogSegmentId id = newRemoteLogSegmentId();
+            final LogSegmentData segment = localLogSegments.nextSegment();
+            tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id), 
segment);
+            remoteStorageVerifier.verifyContainsLogSegmentFiles(id, segment);
+            segmentIds.add(id);
+        }
+        tieredStorage.deletePartition(topicIdPartition);
+        
remoteStorageVerifier.assertFileDoesNotExist(remoteStorageVerifier.expectedPartitionPath());
+        for (RemoteLogSegmentId segmentId: segmentIds) {
+            remoteStorageVerifier.verifyLogSegmentFilesAbsent(segmentId, null);
+        }
+    }
+
+    @Test
+    public void deleteLogSegmentWithoutOptionalFiles() throws 
RemoteStorageException {
+        final RemoteLogSegmentId id = newRemoteLogSegmentId();
+        final LogSegmentData segment = localLogSegments.nextSegment();
+        segment.transactionIndex().get().toFile().delete();
+
+        tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id), 
segment);
+        remoteStorageVerifier.verifyContainsLogSegmentFiles(id, path -> {
+            String fileName = path.getFileName().toString();
+            if (!(fileName.contains("transaction_index") || 
fileName.contains("snapshot"))) {
+                remoteStorageVerifier.assertFileExists(path);
+            }
+        });
+
+        tieredStorage.deleteLogSegmentData(newRemoteLogSegmentMetadata(id));
+        remoteStorageVerifier.verifyLogSegmentFilesAbsent(id, segment);
+    }
+
+    @Test
+    public void segmentsAreNotDeletedIfDeleteApiIsDisabled(TestInfo testInfo) 
throws RemoteStorageException {
+        
init(Collections.singletonMap(LocalTieredStorage.ENABLE_DELETE_API_PROP, 
"false"), testInfo.getDisplayName());
+
+        final RemoteLogSegmentId id = newRemoteLogSegmentId();
+        final LogSegmentData segment = localLogSegments.nextSegment();
+
+        tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id), 
segment);
+        remoteStorageVerifier.verifyContainsLogSegmentFiles(id, segment);
+
+        tieredStorage.deleteLogSegmentData(newRemoteLogSegmentMetadata(id));
+        remoteStorageVerifier.verifyContainsLogSegmentFiles(id, segment);
+    }
+
+    @Test
+    public void traverseSingleOffloadedRecord() throws RemoteStorageException {
+        final byte[] bytes = new byte[]{0, 1, 2};
+
+        final RemoteLogSegmentId id = newRemoteLogSegmentId();
+        final LogSegmentData segment = localLogSegments.nextSegment(bytes);
+
+        tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id), 
segment);
+
+        tieredStorage.traverse(new LocalTieredStorageTraverser() {
+            @Override
+            public void visitTopicIdPartition(TopicIdPartition 
topicIdPartition) {
+                assertEquals(LocalTieredStorageTest.this.topicPartition, 
topicIdPartition.topicPartition());
+            }
+
+            @Override
+            public void visitSegment(RemoteLogSegmentFileset fileset) {
+                assertEquals(id, fileset.getRemoteLogSegmentId());
+
+                try {
+                    final FileRecords records = 
FileRecords.open(fileset.getFile(SEGMENT));
+                    final Iterator<Record> it = records.records().iterator();
+
+                    assertEquals(wrap(bytes), it.next().value());
+
+                } catch (IOException e) {
+                    throw new AssertionError(e);
+                }
+            }
+        });
+    }
+
+    @Test
+    public void traverseMultipleOffloadedRecordsInOneSegment() throws 
RemoteStorageException, IOException {
+        final byte[] record1 = new byte[]{0, 1, 2};
+        final byte[] record2 = new byte[]{3, 4, 5};
+        final RemoteLogSegmentId id = newRemoteLogSegmentId();
+
+        tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id), 
localLogSegments.nextSegment(record1, record2));
+
+        final LocalTieredStorageSnapshot snapshot = 
takeSnapshot(tieredStorage);
+
+        assertEquals(asList(topicPartition), snapshot.getTopicPartitions());
+        assertEquals(asList(wrap(record1), wrap(record2)), 
extractRecordsValue(snapshot, id));
+    }
+
+    @Test
+    public void traverseMultipleOffloadedRecordsInTwoSegments() throws 
RemoteStorageException, IOException {
+        final byte[] record1a = new byte[]{0, 1, 2};
+        final byte[] record2a = new byte[]{3, 4, 5};
+        final byte[] record1b = new byte[]{6, 7, 8};
+        final byte[] record2b = new byte[]{9, 10, 11};
+
+        final RemoteLogSegmentId idA = newRemoteLogSegmentId();
+        final RemoteLogSegmentId idB = newRemoteLogSegmentId();
+
+        tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(idA), 
localLogSegments.nextSegment(record1a, record2a));
+        tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(idB), 
localLogSegments.nextSegment(record1b, record2b));
+
+        final LocalTieredStorageSnapshot snapshot = 
takeSnapshot(tieredStorage);
+
+        final Map<RemoteLogSegmentId, List<ByteBuffer>> expected = new 
HashMap<>();
+        expected.put(idA, asList(wrap(record1a), wrap(record2a)));
+        expected.put(idB, asList(wrap(record1b), wrap(record2b)));
+
+        final Map<RemoteLogSegmentId, List<ByteBuffer>> actual = new 
HashMap<>();
+        actual.put(idA, extractRecordsValue(snapshot, idA));
+        actual.put(idB, extractRecordsValue(snapshot, idB));
+
+        assertEquals(asList(topicPartition), snapshot.getTopicPartitions());
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void fetchThrowsIfDataDoesNotExist() {
+        final RemoteLogSegmentMetadata metadata = 
newRemoteLogSegmentMetadata(newRemoteLogSegmentId());
+
+        assertThrows(RemoteResourceNotFoundException.class,
+            () -> tieredStorage.fetchLogSegment(metadata, 0, 
Integer.MAX_VALUE));
+        assertThrows(RemoteResourceNotFoundException.class, () -> 
tieredStorage.fetchIndex(metadata, OFFSET));
+        assertThrows(RemoteResourceNotFoundException.class, () -> 
tieredStorage.fetchIndex(metadata, TIMESTAMP));
+        assertThrows(RemoteResourceNotFoundException.class, () -> 
tieredStorage.fetchIndex(metadata, LEADER_EPOCH));
+
+        try {
+            assertArrayEquals(new byte[0], 
remoteStorageVerifier.readFully(tieredStorage.fetchIndex(metadata, 
TRANSACTION)));
+            assertArrayEquals(new byte[0], 
remoteStorageVerifier.readFully(tieredStorage.fetchIndex(metadata, 
PRODUCER_SNAPSHOT)));
+        } catch (Exception ex) {
+            fail("Shouldn't have thrown an exception when optional file 
doesn't exists in the remote store");
+        }
+    }
+
+    @Test
+    public void assertStartAndEndPositionConsistency() {
+        final RemoteLogSegmentMetadata metadata = 
newRemoteLogSegmentMetadata(newRemoteLogSegmentId());
+
+        assertThrows(IllegalArgumentException.class, () -> 
tieredStorage.fetchLogSegment(metadata, -1, Integer.MAX_VALUE));
+        assertThrows(IllegalArgumentException.class, () -> 
tieredStorage.fetchLogSegment(metadata, 1, -1));
+        assertThrows(IllegalArgumentException.class, () -> 
tieredStorage.fetchLogSegment(metadata, 2, 1));
+    }
+
+    private RemoteLogSegmentMetadata newRemoteLogSegmentMetadata(final 
RemoteLogSegmentId id) {
+        return new RemoteLogSegmentMetadata(id, 0, 0, -1L, -1, 1000L,
+                1024, Collections.singletonMap(0, 0L));
+    }
+
+    private RemoteLogSegmentId newRemoteLogSegmentId() {
+        return new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid());
+    }
+
+    private static List<ByteBuffer> extractRecordsValue(
+            final LocalTieredStorageSnapshot snapshot,
+            final RemoteLogSegmentId id) throws IOException {
+
+        final FileRecords records = FileRecords.open(snapshot.getFile(id, 
SEGMENT));
+        final List<ByteBuffer> buffers = new ArrayList<>();
+
+        for (Record record: records.records()) {
+            buffers.add(record.value());
+        }
+
+        return buffers;
+    }
+
+    private static final DateTimeFormatter DATE_TIME_FORMATTER = 
DateTimeFormatter.ofPattern("yyyy-MM-dd-HH:mm:ss");
+
+    private String generateStorageId(String testName) {
+        return format("%s-%s-%s",
+                getClass().getSimpleName(), testName, 
DATE_TIME_FORMATTER.format(LocalDateTime.now()));
+    }
+
+    public final class Verifier {
+        private final LocalTieredStorage remoteStorage;
+        private final TopicIdPartition topicIdPartition;
+
+        public Verifier(final LocalTieredStorage remoteStorage, final 
TopicIdPartition topicIdPartition) {
+            this.remoteStorage = requireNonNull(remoteStorage);
+            this.topicIdPartition = requireNonNull(topicIdPartition);
+        }
+
+        private List<Path> expectedPaths(final RemoteLogSegmentId id) {
+            final String rootPath = getStorageRootDirectory();
+            TopicPartition tp = topicIdPartition.topicPartition();
+            final String topicPartitionSubpath = format("%s-%d-%s", 
topicIdPartition.topicId(),
+                   tp.partition(), tp.topic());
+            final String uuid = id.id().toString();
+
+            return Arrays.asList(
+                    Paths.get(rootPath, topicPartitionSubpath, uuid + 
"-segment"),

Review Comment:
   these suffix should be moved to a constant



##########
storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentFileset.java:
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.Record;
+import org.slf4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static java.lang.String.format;
+import static java.util.Arrays.stream;
+import static java.util.Collections.unmodifiableMap;
+import static java.util.Objects.requireNonNull;
+import static java.util.function.Function.identity;
+import static java.util.regex.Pattern.compile;
+import static java.util.stream.Collectors.toMap;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.LEADER_EPOCH_CHECKPOINT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.OFFSET_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.PRODUCER_SNAPSHOT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TIME_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TRANSACTION_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.getFileType;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openTopicPartitionDirectory;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Represents the set of files offloaded to the local tiered storage for a 
single log segment.
+ * A {@link RemoteLogSegmentFileset} corresponds to the leaves of the file 
system structure of
+ * the local tiered storage:
+ *
+ * <code>
+ * / storage-directory / uuidBase64-partition-topic / 
oAtiIQ95REujbuzNd_lkLQ-segment
+ *                                                  . 
oAtiIQ95REujbuzNd_lkLQ-offset_index
+ *                                                  . 
oAtiIQ95REujbuzNd_lkLQ-time_index
+ * </code>
+ */
+public final class RemoteLogSegmentFileset {
+
+    /**
+     * The format of a file which belongs to the fileset, i.e. a file which is 
assigned to a log segment in
+     * Kafka's log directory.
+     *
+     * The name of each of the files under the scope of a log segment (the log 
file, its indexes, etc.)
+     * follows the structure UUID-FileType.
+     */
+    private static final String UUID_LEGAL_CHARS = "[a-zA-Z0-9_-]{22}";
+    private static final Pattern FILENAME_FORMAT = compile("(" + 
UUID_LEGAL_CHARS + ")-([a-z_]+)");
+    private static final int GROUP_UUID = 1;
+    private static final int GROUP_FILE_TYPE = 2;
+
+    /**
+     * Characterises the type of a file in the local tiered storage copied 
from Apache Kafka's standard storage.
+     */
+    public enum RemoteLogSegmentFileType {
+        SEGMENT(false),
+        OFFSET_INDEX(false),
+        TIME_INDEX(false),
+        TRANSACTION_INDEX(true),
+        LEADER_EPOCH_CHECKPOINT(false),
+        PRODUCER_SNAPSHOT(true);

Review Comment:
   Current code expects producer snapshot to be present (although this is 
incorrect because segments uploaded prior to 2.8 won't have this), hence, this 
is a mandatory entry. The code fails at:
   
https://github.com/apache/kafka/blob/43574beb972d47d696e0de077f453b36ce148026/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L592
 with null pointer exception if producer snapshot is not available.
   
   



##########
storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteTopicPartitionDirectory.java:
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.internals.Topic;
+import org.slf4j.Logger;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static java.lang.String.format;
+import static java.util.Arrays.asList;
+import static java.util.Objects.requireNonNull;
+import static java.util.regex.Pattern.compile;
+import static java.util.stream.Collectors.toSet;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.getUuid;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.deleteFilesOnly;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.deleteQuietly;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Represents a topic-partition directory in the local tiered storage under 
which filesets for
+ * log segments are stored.
+ *
+ *
+ * <code>
+ * / storage-directory / uuidBase64-0-topic / tvHCaSDsQZWsjr5rbtCjxA-segment
+ *                     .                   .  
tvHCaSDsQZWsjr5rbtCjxA-offset_index
+ *                     .                   .  tvHCaSDsQZWsjr5rbtCjxA-time_index
+ *                     .
+ *                     / 5fEBmixCR5-dMntYSLIr1g-3-topic / 
BFyXlC8ySMm-Uzxw5lZSMg-segment
+ *                                                      . 
BFyXlC8ySMm-Uzxw5lZSMg-offset_index

Review Comment:
   Can we use `UnifiedLog.IndexFileSuffix` (similar for offset and trx index) 
here which will use `.` instead of underscore to name the files? This will 
bring consistency with how we store the file names on log and RemoteIndexCache.



##########
storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentFileset.java:
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.Record;
+import org.slf4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static java.lang.String.format;
+import static java.util.Arrays.stream;
+import static java.util.Collections.unmodifiableMap;
+import static java.util.Objects.requireNonNull;
+import static java.util.function.Function.identity;
+import static java.util.regex.Pattern.compile;
+import static java.util.stream.Collectors.toMap;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.LEADER_EPOCH_CHECKPOINT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.OFFSET_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.PRODUCER_SNAPSHOT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TIME_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TRANSACTION_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.getFileType;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openTopicPartitionDirectory;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Represents the set of files offloaded to the local tiered storage for a 
single log segment.
+ * A {@link RemoteLogSegmentFileset} corresponds to the leaves of the file 
system structure of
+ * the local tiered storage:
+ *
+ * <code>
+ * / storage-directory / uuidBase64-partition-topic / 
oAtiIQ95REujbuzNd_lkLQ-segment
+ *                                                  . 
oAtiIQ95REujbuzNd_lkLQ-offset_index
+ *                                                  . 
oAtiIQ95REujbuzNd_lkLQ-time_index
+ * </code>
+ */
+public final class RemoteLogSegmentFileset {
+
+    /**
+     * The format of a file which belongs to the fileset, i.e. a file which is 
assigned to a log segment in
+     * Kafka's log directory.
+     *
+     * The name of each of the files under the scope of a log segment (the log 
file, its indexes, etc.)
+     * follows the structure UUID-FileType.
+     */
+    private static final String UUID_LEGAL_CHARS = "[a-zA-Z0-9_-]{22}";
+    private static final Pattern FILENAME_FORMAT = compile("(" + 
UUID_LEGAL_CHARS + ")-([a-z_]+)");
+    private static final int GROUP_UUID = 1;
+    private static final int GROUP_FILE_TYPE = 2;
+
+    /**
+     * Characterises the type of a file in the local tiered storage copied 
from Apache Kafka's standard storage.
+     */
+    public enum RemoteLogSegmentFileType {
+        SEGMENT(false),
+        OFFSET_INDEX(false),
+        TIME_INDEX(false),
+        TRANSACTION_INDEX(true),
+        LEADER_EPOCH_CHECKPOINT(false),
+        PRODUCER_SNAPSHOT(true);
+
+        private final boolean optional;
+
+        RemoteLogSegmentFileType(boolean optional) {
+            this.optional = optional;
+        }
+
+        /**
+         * Provides the name of the file of this type for the given UUID in 
the local tiered storage,
+         * e.g. uuid-segment.
+         */
+        public String toFilename(final Uuid uuid) {
+            return format("%s-%s", uuid.toString(), 
name().toLowerCase(Locale.ROOT));
+        }
+
+        /**
+         * Returns the nature of the data stored in the file with the provided 
name.
+         */
+        public static RemoteLogSegmentFileType getFileType(final String 
filename) {
+            try {
+                return RemoteLogSegmentFileType.valueOf(substr(filename, 
GROUP_FILE_TYPE).toUpperCase(Locale.ROOT));
+
+            } catch (final RuntimeException e) {
+                throw new IllegalArgumentException(format("Not a remote log 
segment file: %s", filename), e);
+            }
+        }
+
+        /**
+         * Extract the UUID from the filename. This UUID is that of the remote 
log segment id which uniquely
+         * identify the log segment which filename's data belongs to (not 
necessarily segment data, but also
+         * indexes or other associated files).
+         */
+        public static Uuid getUuid(final String filename) {
+            return Uuid.fromString(substr(filename, GROUP_UUID));
+        }
+
+        static String substr(final String filename, final int group) {
+            final Matcher m = FILENAME_FORMAT.matcher(filename);
+            if (!m.matches()) {
+                throw new IllegalArgumentException(format("Not a remote log 
segment file: %s", filename));
+            }
+            return m.group(group);
+        }
+
+        public boolean isOptional() {
+            return optional;
+        }
+    }
+
+    private static final Logger LOGGER = 
getLogger(RemoteLogSegmentFileset.class);
+
+    private final RemoteTopicPartitionDirectory partitionDirectory;
+    private final RemoteLogSegmentId remoteLogSegmentId;
+    private final Map<RemoteLogSegmentFileType, File> files;
+
+    /**
+     * Creates a new fileset located under the given storage directory for the 
provided remote log segment id.
+     * The topic-partition directory is created if it does not exist yet. 
However the files corresponding to
+     * the log segment offloaded are not created on the file system until 
transfer happens.
+     *
+     * @param storageDir The root directory of the local tiered storage.
+     * @param id Remote log segment id assigned to a log segment in Kafka.
+     * @return A new fileset instance.
+     */
+    public static RemoteLogSegmentFileset openFileset(final File storageDir, 
final RemoteLogSegmentId id) {
+
+        final RemoteTopicPartitionDirectory tpDir = 
openTopicPartitionDirectory(id.topicIdPartition(), storageDir);
+        final File partitionDirectory = tpDir.getDirectory();
+        final Uuid uuid = id.id();
+
+        final Map<RemoteLogSegmentFileType, File> files = 
stream(RemoteLogSegmentFileType.values())
+                .collect(toMap(identity(), type -> new 
File(partitionDirectory, type.toFilename(uuid))));
+
+        return new RemoteLogSegmentFileset(tpDir, id, files);
+    }
+
+    /**
+     * Creates a fileset instance for the physical set of files located under 
the given topic-partition directory.
+     * The fileset MUST exist on the file system with the given uuid.
+     *
+     * @param tpDirectory The topic-partition directory which this fileset's 
segment belongs to.
+     * @param uuid The expected UUID of the fileset.
+     * @return A new fileset instance.
+     */
+    public static RemoteLogSegmentFileset openExistingFileset(final 
RemoteTopicPartitionDirectory tpDirectory,
+                                                              final Uuid uuid) 
{
+        final Map<RemoteLogSegmentFileType, File> files =
+                stream(tpDirectory.getDirectory().listFiles())

Review Comment:
   also filter for non-directories



##########
storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java:
##########
@@ -0,0 +1,564 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageListener.LocalTieredStorageListeners;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.test.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.lang.String.format;
+import static java.nio.file.Files.newInputStream;
+import static java.nio.file.StandardOpenOption.READ;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_PARTITION;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_LEADER_EPOCH_CHECKPOINT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_OFFSET_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_PRODUCER_SNAPSHOT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TIME_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TRANSACTION_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.OFFLOAD_SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.LEADER_EPOCH_CHECKPOINT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.OFFSET_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.PRODUCER_SNAPSHOT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TIME_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TRANSACTION_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.openFileset;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openExistingTopicPartitionDirectory;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openTopicPartitionDirectory;
+
+/**
+ * An implementation of {@link RemoteStorageManager} which relies on the local 
file system to store
+ * offloaded log segments and associated data.
+ * <p>
+ * Due to the consistency semantic of POSIX-compliant file systems, this 
remote storage provides strong
+ * read-after-write consistency and a segment's data can be accessed once the 
copy to the storage succeeded.
+ * </p>
+ * <p>
+ * In order to guarantee isolation, independence, reproducibility and 
consistency of unit and integration
+ * tests, the scope of a storage implemented by this class, and identified via 
the storage ID provided to
+ * the constructor, should be limited to a test or well-defined self-contained 
use-case.
+ * </p>
+ * <p>
+ * The local tiered storage keeps a simple structure of directories mimicking 
that of Apache Kafka.
+ * <p>
+ * The name of each of the files under the scope of a log segment (the log 
file, its indexes, etc.)
+ * follows the structure UuidBase64-FileType.
+ * <p>
+ * Given the root directory of the storage, segments and associated files are 
organized as represented below.
+ * </p>
+ * <code>
+ * / storage-directory  / LWgrMmVrT0a__7a4SasuPA-0-topic / 
bCqX9U--S-6U8XUM9II25Q-segment
+ * .                                                     . 
bCqX9U--S-6U8XUM9II25Q-offset_index
+ * .                                                     . 
bCqX9U--S-6U8XUM9II25Q-time_index
+ * .                                                     . 
h956soEzTzi9a-NOQ-DvKA-segment
+ * .                                                     . 
h956soEzTzi9a-NOQ-DvKA-offset_index
+ * .                                                     . 
h956soEzTzi9a-NOQ-DvKA-segment
+ * .
+ * / LWgrMmVrT0a__7a4SasuPA-1-topic / o8CQPT86QQmbFmi3xRmiHA-segment
+ * .                                . o8CQPT86QQmbFmi3xRmiHA-offset_index
+ * .                                . o8CQPT86QQmbFmi3xRmiHA-time_index
+ * .
+ * / DRagLm_PS9Wl8fz1X43zVg-3-btopic / jvj3vhliTGeU90sIosmp_g-segment
+ * .                                 . jvj3vhliTGeU90sIosmp_g-offset_index
+ * .                                 . jvj3vhliTGeU90sIosmp_g-time_index
+ * </code>
+ */
+public final class LocalTieredStorage implements RemoteStorageManager {
+
+    public static final String STORAGE_CONFIG_PREFIX = 
"remote.log.storage.local.";
+
+    /**
+     * The root directory of this storage.
+     */
+    public static final String STORAGE_DIR_PROP = "dir";
+
+    /**
+     * Delete all files and directories from this storage on close, 
substantially removing it
+     * entirely from the file system.
+     */
+    public static final String DELETE_ON_CLOSE_PROP = "delete.on.close";
+
+    /**
+     * The implementation of the transfer of the data of the canonical segment 
and index files to
+     * this storage. The only reason the "transferer" abstraction exists is to 
be able to simulate
+     * file copy errors and exercise the associated failure modes.
+     */
+    public static final String TRANSFERER_CLASS_PROP = "transferer";
+
+    /**
+     * Whether the deleteLogSegment() implemented by this storage should 
actually delete data or behave
+     * as a no-operation. This allows to simulate non-strongly consistent 
storage systems which do not
+     * guarantee visibility of a successful delete for subsequent read or list 
operations.
+     */
+    public static final String ENABLE_DELETE_API_PROP = "delete.enable";
+
+    /**
+     * The ID of the broker which owns this instance of {@link 
LocalTieredStorage}.
+     */
+    public static final String BROKER_ID = "broker.id";
+
+    private static final String ROOT_STORAGES_DIR_NAME = 
"kafka-tiered-storage";
+
+    private volatile File storageDirectory;
+    private volatile boolean deleteOnClose = false;
+    private volatile boolean deleteEnabled = true;
+    private volatile Transferer transferer = new Transferer() {
+        @Override
+        public void transfer(File from, File to) throws IOException {
+            if (from.exists()) {
+                Files.copy(from.toPath(), to.toPath());
+            }
+        }
+
+        @Override
+        public void transfer(ByteBuffer from, File to) throws IOException {
+            if (from != null && from.hasRemaining()) {
+                try (FileOutputStream fileOutputStream = new 
FileOutputStream(to, false);
+                     FileChannel channel = fileOutputStream.getChannel()) {
+                    channel.write(from);
+                }
+            }
+        }
+    };
+
+    private volatile int brokerId = -1;
+
+    private volatile Logger logger = 
LoggerFactory.getLogger(LocalTieredStorage.class);
+
+    /**
+     * Used to explicit a chronological ordering of the events generated by 
the local tiered storage
+     * which this instance gives access to.
+     */
+    // TODO: Makes this timestamp only dependent on the assigned broker, not 
the class instance.
+    private final AtomicInteger eventTimestamp = new AtomicInteger(-1);
+
+    /**
+     * Used to notify users of this storage of internal updates - new 
topic-partition recorded (upon directory
+     * creation) and segment file written (upon segment file write(2)).
+     */
+    private final LocalTieredStorageListeners storageListeners = new 
LocalTieredStorageListeners();
+
+    private final LocalTieredStorageHistory history = new 
LocalTieredStorageHistory();
+
+    public LocalTieredStorage() {
+        history.listenTo(this);
+    }
+
+    /**
+     * Walks through this storage and notify the traverser of every 
topic-partition, segment and record discovered.
+     * <p>
+     * - The order of traversal of the topic-partition is not specified.
+     * - The order of traversal of the segments within a topic-partition is in 
ascending order
+     * of the modified timestamp of the segment file.
+     * - The order of traversal of records within a segment corresponds to the 
insertion
+     * order of these records in the original segment from which the segment 
in this storage
+     * was transferred from.
+     * <p>
+     * This method is NOT an atomic operation w.r.t the local tiered storage. 
This storage may change while
+     * being traversed topic-partitions, segments and records are communicated 
to the traverser. There is
+     * no guarantee updates to the storage which happens during traversal will 
be communicated to the traverser.
+     * Especially, in case of concurrent read and write/delete to a 
topic-partition, a segment or a record,
+     * the behaviour depends on the underlying file system.
+     *
+     * @param traverser User-specified object to which this storage 
communicates the topic-partitions,
+     *                  segments and records as they are discovered.
+     */
+    public void traverse(final LocalTieredStorageTraverser traverser) {
+        Objects.requireNonNull(traverser);
+
+        final File[] files = storageDirectory.listFiles();
+        if (files == null) {
+            // files can be null if the directory is empty.
+            return;
+        }
+
+        Arrays.stream(files)
+                .filter(File::isDirectory)
+                .forEach(dir ->
+                        openExistingTopicPartitionDirectory(dir.getName(), 
storageDirectory).traverse(traverser));
+    }
+
+    public void addListener(final LocalTieredStorageListener listener) {
+        this.storageListeners.add(listener);
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        if (storageDirectory != null) {
+            throw new InvalidConfigurationException(format("This instance of 
local remote storage" +
+                    "is already configured. The existing storage directory is 
%s. Ensure the method " +
+                    "configure() is only called once.", 
storageDirectory.getAbsolutePath()));
+        }
+
+        final String storageDir = (String) configs.get(STORAGE_DIR_PROP);
+        final String shouldDeleteOnClose = (String) 
configs.get(DELETE_ON_CLOSE_PROP);
+        final String transfererClass = (String) 
configs.get(TRANSFERER_CLASS_PROP);
+        final String isDeleteEnabled = (String) 
configs.get(ENABLE_DELETE_API_PROP);
+        final Integer brokerIdInt = (Integer) configs.get(BROKER_ID);
+
+        if (brokerIdInt == null) {
+            throw new InvalidConfigurationException(
+                    "Broker ID is required to configure the LocalTieredStorage 
manager.");
+        }
+
+        brokerId = brokerIdInt;
+        logger = new LogContext(format("[LocalTieredStorage Id=%d] ", 
brokerId)).logger(this.getClass());
+
+        if (shouldDeleteOnClose != null) {
+            deleteOnClose = Boolean.parseBoolean(shouldDeleteOnClose);
+        }
+
+        if (isDeleteEnabled != null) {
+            deleteEnabled = Boolean.parseBoolean(isDeleteEnabled);
+        }
+
+        if (transfererClass != null) {
+            try {
+                transferer = (Transferer) 
getClass().getClassLoader().loadClass(transfererClass).newInstance();
+
+            } catch (InstantiationException | IllegalAccessException | 
ClassNotFoundException | ClassCastException e) {
+                throw new RuntimeException(format("Cannot create transferer 
from class '%s'", transfererClass), e);
+            }
+        }
+
+
+        if (storageDir == null) {
+            storageDirectory = TestUtils.tempDirectory(ROOT_STORAGES_DIR_NAME 
+ "-");
+
+            logger.debug("No storage directory specified, created temporary 
directory: {}",
+                    storageDirectory.getAbsolutePath());
+
+        } else {
+            storageDirectory = new File(storageDir + "/" + 
ROOT_STORAGES_DIR_NAME);

Review Comment:
   You don't need to specify the "/" as it can be OS dependent. File API takes 
care of that for you with File(parent,child) API: 
https://docs.oracle.com/javase/8/docs/api/java/io/File.html#File-java.io.File-java.lang.String-



##########
storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentFileset.java:
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;

Review Comment:
   is this the right place to place these files? Asking because all other TS 
related files are in `kafka.log.remote`



##########
storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentFileset.java:
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.Record;
+import org.slf4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static java.lang.String.format;
+import static java.util.Arrays.stream;
+import static java.util.Collections.unmodifiableMap;
+import static java.util.Objects.requireNonNull;
+import static java.util.function.Function.identity;
+import static java.util.regex.Pattern.compile;
+import static java.util.stream.Collectors.toMap;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.LEADER_EPOCH_CHECKPOINT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.OFFSET_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.PRODUCER_SNAPSHOT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TIME_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TRANSACTION_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.getFileType;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openTopicPartitionDirectory;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Represents the set of files offloaded to the local tiered storage for a 
single log segment.
+ * A {@link RemoteLogSegmentFileset} corresponds to the leaves of the file 
system structure of
+ * the local tiered storage:
+ *
+ * <code>
+ * / storage-directory / uuidBase64-partition-topic / 
oAtiIQ95REujbuzNd_lkLQ-segment
+ *                                                  . 
oAtiIQ95REujbuzNd_lkLQ-offset_index
+ *                                                  . 
oAtiIQ95REujbuzNd_lkLQ-time_index
+ * </code>
+ */
+public final class RemoteLogSegmentFileset {
+
+    /**
+     * The format of a file which belongs to the fileset, i.e. a file which is 
assigned to a log segment in
+     * Kafka's log directory.
+     *
+     * The name of each of the files under the scope of a log segment (the log 
file, its indexes, etc.)
+     * follows the structure UUID-FileType.
+     */
+    private static final String UUID_LEGAL_CHARS = "[a-zA-Z0-9_-]{22}";
+    private static final Pattern FILENAME_FORMAT = compile("(" + 
UUID_LEGAL_CHARS + ")-([a-z_]+)");
+    private static final int GROUP_UUID = 1;
+    private static final int GROUP_FILE_TYPE = 2;
+
+    /**
+     * Characterises the type of a file in the local tiered storage copied 
from Apache Kafka's standard storage.
+     */
+    public enum RemoteLogSegmentFileType {
+        SEGMENT(false),
+        OFFSET_INDEX(false),
+        TIME_INDEX(false),
+        TRANSACTION_INDEX(true),
+        LEADER_EPOCH_CHECKPOINT(false),
+        PRODUCER_SNAPSHOT(true);
+
+        private final boolean optional;
+
+        RemoteLogSegmentFileType(boolean optional) {
+            this.optional = optional;
+        }
+
+        /**
+         * Provides the name of the file of this type for the given UUID in 
the local tiered storage,
+         * e.g. uuid-segment.
+         */
+        public String toFilename(final Uuid uuid) {
+            return format("%s-%s", uuid.toString(), 
name().toLowerCase(Locale.ROOT));
+        }
+
+        /**
+         * Returns the nature of the data stored in the file with the provided 
name.
+         */
+        public static RemoteLogSegmentFileType getFileType(final String 
filename) {
+            try {
+                return RemoteLogSegmentFileType.valueOf(substr(filename, 
GROUP_FILE_TYPE).toUpperCase(Locale.ROOT));
+
+            } catch (final RuntimeException e) {
+                throw new IllegalArgumentException(format("Not a remote log 
segment file: %s", filename), e);
+            }
+        }
+
+        /**
+         * Extract the UUID from the filename. This UUID is that of the remote 
log segment id which uniquely
+         * identify the log segment which filename's data belongs to (not 
necessarily segment data, but also
+         * indexes or other associated files).
+         */
+        public static Uuid getUuid(final String filename) {
+            return Uuid.fromString(substr(filename, GROUP_UUID));
+        }
+
+        static String substr(final String filename, final int group) {
+            final Matcher m = FILENAME_FORMAT.matcher(filename);
+            if (!m.matches()) {
+                throw new IllegalArgumentException(format("Not a remote log 
segment file: %s", filename));
+            }
+            return m.group(group);
+        }
+
+        public boolean isOptional() {
+            return optional;
+        }
+    }
+
+    private static final Logger LOGGER = 
getLogger(RemoteLogSegmentFileset.class);
+
+    private final RemoteTopicPartitionDirectory partitionDirectory;
+    private final RemoteLogSegmentId remoteLogSegmentId;
+    private final Map<RemoteLogSegmentFileType, File> files;
+
+    /**
+     * Creates a new fileset located under the given storage directory for the 
provided remote log segment id.
+     * The topic-partition directory is created if it does not exist yet. 
However the files corresponding to
+     * the log segment offloaded are not created on the file system until 
transfer happens.
+     *
+     * @param storageDir The root directory of the local tiered storage.
+     * @param id Remote log segment id assigned to a log segment in Kafka.
+     * @return A new fileset instance.
+     */
+    public static RemoteLogSegmentFileset openFileset(final File storageDir, 
final RemoteLogSegmentId id) {
+
+        final RemoteTopicPartitionDirectory tpDir = 
openTopicPartitionDirectory(id.topicIdPartition(), storageDir);
+        final File partitionDirectory = tpDir.getDirectory();
+        final Uuid uuid = id.id();
+
+        final Map<RemoteLogSegmentFileType, File> files = 
stream(RemoteLogSegmentFileType.values())
+                .collect(toMap(identity(), type -> new 
File(partitionDirectory, type.toFilename(uuid))));
+
+        return new RemoteLogSegmentFileset(tpDir, id, files);
+    }
+
+    /**
+     * Creates a fileset instance for the physical set of files located under 
the given topic-partition directory.
+     * The fileset MUST exist on the file system with the given uuid.
+     *
+     * @param tpDirectory The topic-partition directory which this fileset's 
segment belongs to.
+     * @param uuid The expected UUID of the fileset.
+     * @return A new fileset instance.
+     */
+    public static RemoteLogSegmentFileset openExistingFileset(final 
RemoteTopicPartitionDirectory tpDirectory,
+                                                              final Uuid uuid) 
{
+        final Map<RemoteLogSegmentFileType, File> files =
+                stream(tpDirectory.getDirectory().listFiles())
+                        .filter(file -> 
file.getName().startsWith(uuid.toString()))
+                        .collect(toMap(file -> getFileType(file.getName()), 
identity()));
+
+        final Set<RemoteLogSegmentFileType> expectedFileTypes = 
stream(RemoteLogSegmentFileType.values())
+                .filter(x -> !x.isOptional()).collect(Collectors.toSet());
+
+        if (!files.keySet().containsAll(expectedFileTypes)) {
+            expectedFileTypes.removeAll(files.keySet());
+            throw new IllegalStateException(format("Invalid fileset, missing 
files: %s", expectedFileTypes));
+        }
+
+        final RemoteLogSegmentId id = new 
RemoteLogSegmentId(tpDirectory.getTopicIdPartition(), uuid);
+        return new RemoteLogSegmentFileset(tpDirectory, id, files);
+    }
+
+    public RemoteLogSegmentId getRemoteLogSegmentId() {
+        return remoteLogSegmentId;
+    }
+
+    public File getFile(final RemoteLogSegmentFileType type) {
+        return files.get(type);
+    }
+
+    public boolean delete() {
+        return deleteFilesOnly(files.values());
+    }
+
+    public List<Record> getRecords() throws IOException {
+        return StreamSupport
+                
.stream(FileRecords.open(files.get(SEGMENT)).records().spliterator(), false)
+                .collect(Collectors.toList());
+    }
+
+    public void copy(final Transferer transferer, final LogSegmentData data) 
throws IOException {
+        transferer.transfer(data.logSegment().toFile(), files.get(SEGMENT));
+        transferer.transfer(data.offsetIndex().toFile(), 
files.get(OFFSET_INDEX));
+        transferer.transfer(data.timeIndex().toFile(), files.get(TIME_INDEX));
+        if (data.transactionIndex().isPresent()) {
+            transferer.transfer(data.transactionIndex().get().toFile(), 
files.get(TRANSACTION_INDEX));
+        }
+        transferer.transfer(data.leaderEpochIndex(), 
files.get(LEADER_EPOCH_CHECKPOINT));
+        transferer.transfer(data.producerSnapshotIndex().toFile(), 
files.get(PRODUCER_SNAPSHOT));
+    }
+
+    public String toString() {
+        final String ls = files.values().stream()
+                .map(file -> "\t" + file.getName() + "\n")
+                .reduce("", (s1, s2) -> s1 + s2);
+
+        return format("%s/\n%s", partitionDirectory.getDirectory().getName(), 
ls);
+    }
+
+    public static boolean deleteFilesOnly(final Collection<File> files) {
+        final Optional<File> notAFile = files.stream().filter(f -> f.exists() 
&& !f.isFile()).findAny();
+
+        if (notAFile.isPresent()) {
+            LOGGER.warn(format("Found unexpected directory %s. Will not 
delete.", notAFile.get().getAbsolutePath()));
+            return false;
+        }
+
+        return 
files.stream().map(RemoteLogSegmentFileset::deleteQuietly).reduce(true, 
Boolean::logicalAnd);
+    }
+
+    public static boolean deleteQuietly(final File file) {
+        try {
+            LOGGER.trace("Deleting " + file.getAbsolutePath());
+            if (!file.exists()) {
+                return true;
+            }
+            return file.delete();

Review Comment:
   You can use `Utils.delete(File)` here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to