This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 42800d9  feat: implement log file reader for parquet log block (#244)
42800d9 is described below

commit 42800d99cf6361fdfebcd0eb5174e33f3c11170d
Author: Shiyan Xu <[email protected]>
AuthorDate: Tue Jan 14 00:50:15 2025 -0600

    feat: implement log file reader for parquet log block (#244)
---
 crates/core/src/error.rs                           |   9 +
 crates/core/src/file_group/base_file.rs            |  26 +-
 crates/core/src/file_group/builder.rs              |  16 +-
 crates/core/src/file_group/log_file/log_block.rs   | 306 +++++++++++++++++++++
 crates/core/src/file_group/log_file/log_format.rs  | 180 ++++++++++++
 crates/core/src/file_group/log_file/mod.rs         | 185 +++++++++++++
 crates/core/src/file_group/log_file/reader.rs      | 278 +++++++++++++++++++
 crates/core/src/file_group/mod.rs                  |  23 +-
 crates/core/src/storage/error.rs                   |   3 +
 crates/core/src/storage/mod.rs                     |  14 +-
 crates/core/src/storage/reader.rs                  |  50 ++++
 crates/core/src/table/fs_view.rs                   |   8 +-
 ...a144b5ea064-0_20250113230302428.log.1_0-188-387 | Bin 0 -> 5311 bytes
 python/hudi/_internal.pyi                          |   4 +-
 python/src/internal.rs                             |   6 +-
 15 files changed, 1063 insertions(+), 45 deletions(-)

diff --git a/crates/core/src/error.rs b/crates/core/src/error.rs
index bf35058..48db16f 100644
--- a/crates/core/src/error.rs
+++ b/crates/core/src/error.rs
@@ -42,12 +42,21 @@ pub enum CoreError {
     #[error("{0}")]
     ReadFileSliceError(String),
 
+    #[error("{0}")]
+    LogFormatError(String),
+
+    #[error("{0}")]
+    LogBlockError(String),
+
     #[error("{0}")]
     InvalidPartitionPath(String),
 
     #[error(transparent)]
     ParquetError(#[from] parquet::errors::ParquetError),
 
+    #[error("{0}")]
+    ReadLogFileError(#[from] std::io::Error),
+
     #[error("Storage error: {0}")]
     Storage(#[from] StorageError),
 
diff --git a/crates/core/src/file_group/base_file.rs 
b/crates/core/src/file_group/base_file.rs
index 6f26c85..7c4fcfb 100644
--- a/crates/core/src/file_group/base_file.rs
+++ b/crates/core/src/file_group/base_file.rs
@@ -27,7 +27,7 @@ pub struct BaseFile {
     pub file_name: String,
 
     /// The id of the enclosing file group.
-    pub file_group_id: String,
+    pub file_id: String,
 
     /// The associated instant time of the base file.
     pub instant_time: String,
@@ -37,14 +37,14 @@ pub struct BaseFile {
 }
 
 impl BaseFile {
-    /// Parse file name and extract `file_group_id` and `instant_time`.
+    /// Parse file name and extract `file_id` and `instant_time`.
     fn parse_file_name(file_name: &str) -> Result<(String, String)> {
         let err_msg = format!("Failed to parse file name '{file_name}' for 
base file.");
         let (name, _) = file_name
             .rsplit_once('.')
             .ok_or_else(|| CoreError::FileGroup(err_msg.clone()))?;
         let parts: Vec<&str> = name.split('_').collect();
-        let file_group_id = parts
+        let file_id = parts
             .first()
             .ok_or_else(|| CoreError::FileGroup(err_msg.clone()))?
             .to_string();
@@ -52,7 +52,7 @@ impl BaseFile {
             .get(2)
             .ok_or_else(|| CoreError::FileGroup(err_msg.clone()))?
             .to_string();
-        Ok((file_group_id, instant_time))
+        Ok((file_id, instant_time))
     }
 }
 
@@ -60,10 +60,10 @@ impl TryFrom<&str> for BaseFile {
     type Error = CoreError;
 
     fn try_from(file_name: &str) -> Result<Self> {
-        let (file_group_id, instant_time) = Self::parse_file_name(file_name)?;
+        let (file_id, instant_time) = Self::parse_file_name(file_name)?;
         Ok(Self {
             file_name: file_name.to_string(),
-            file_group_id,
+            file_id,
             instant_time,
             file_metadata: None,
         })
@@ -75,10 +75,10 @@ impl TryFrom<FileMetadata> for BaseFile {
 
     fn try_from(metadata: FileMetadata) -> Result<Self> {
         let file_name = metadata.name.clone();
-        let (file_group_id, instant_time) = Self::parse_file_name(&file_name)?;
+        let (file_id, instant_time) = Self::parse_file_name(&file_name)?;
         Ok(Self {
             file_name,
-            file_group_id,
+            file_id,
             instant_time,
             file_metadata: Some(metadata),
         })
@@ -94,10 +94,7 @@ mod tests {
     fn test_create_base_file_from_file_name() {
         let file_name = 
"5a226868-2934-4f84-a16f-55124630c68d-0_0-7-24_20240402144910683.parquet";
         let base_file = BaseFile::try_from(file_name).unwrap();
-        assert_eq!(
-            base_file.file_group_id,
-            "5a226868-2934-4f84-a16f-55124630c68d-0"
-        );
+        assert_eq!(base_file.file_id, 
"5a226868-2934-4f84-a16f-55124630c68d-0");
         assert_eq!(base_file.instant_time, "20240402144910683");
         assert!(base_file.file_metadata.is_none());
     }
@@ -109,10 +106,7 @@ mod tests {
             1024,
         );
         let base_file = BaseFile::try_from(metadata).unwrap();
-        assert_eq!(
-            base_file.file_group_id,
-            "5a226868-2934-4f84-a16f-55124630c68d-0"
-        );
+        assert_eq!(base_file.file_id, 
"5a226868-2934-4f84-a16f-55124630c68d-0");
         assert_eq!(base_file.instant_time, "20240402144910683");
         let file_metadata = base_file.file_metadata.unwrap();
         assert_eq!(file_metadata.size, 1024);
diff --git a/crates/core/src/file_group/builder.rs 
b/crates/core/src/file_group/builder.rs
index 9f50360..680b55d 100644
--- a/crates/core/src/file_group/builder.rs
+++ b/crates/core/src/file_group/builder.rs
@@ -41,7 +41,7 @@ pub fn build_file_groups(commit_metadata: &Map<String, 
Value>) -> Result<HashSet
         let partition = (!partition.is_empty()).then(|| partition.to_string());
 
         for stat in write_stats {
-            let file_group_id = stat
+            let file_id = stat
                 .get("fileId")
                 .and_then(|v| v.as_str())
                 .ok_or_else(|| CoreError::CommitMetadata("Invalid fileId in 
write stats".into()))?;
@@ -57,7 +57,7 @@ pub fn build_file_groups(commit_metadata: &Map<String, 
Value>) -> Result<HashSet
                 .ok_or_else(|| CoreError::CommitMetadata("Invalid file name in 
path".into()))?;
 
             let file_group = FileGroup::new_with_base_file_name(
-                file_group_id.to_string(),
+                file_id.to_string(),
                 partition.clone(),
                 file_name,
             )?;
@@ -80,15 +80,15 @@ pub fn build_replaced_file_groups(
 
     let mut file_groups = HashSet::new();
 
-    for (partition, file_group_ids_value) in partition_to_replaced {
-        let file_group_ids = file_group_ids_value
+    for (partition, file_ids_value) in partition_to_replaced {
+        let file_ids = file_ids_value
             .as_array()
             .ok_or_else(|| CoreError::CommitMetadata("Invalid file group ids 
array".into()))?;
 
         let partition = (!partition.is_empty()).then(|| partition.to_string());
 
-        for file_group_id in file_group_ids {
-            let id = file_group_id
+        for file_id in file_ids {
+            let id = file_id
                 .as_str()
                 .ok_or_else(|| CoreError::CommitMetadata("Invalid file group 
id string".into()))?;
 
@@ -300,7 +300,7 @@ mod tests {
         }
 
         #[test]
-        fn test_invalid_file_group_ids_array() {
+        fn test_invalid_file_ids_array() {
             let metadata: Map<String, Value> = json!({
                 "partitionToReplaceFileIds": {
                     "20": "not_an_array"
@@ -318,7 +318,7 @@ mod tests {
         }
 
         #[test]
-        fn test_invalid_file_group_id_type() {
+        fn test_invalid_file_id_type() {
             let metadata: Map<String, Value> = json!({
                 "partitionToReplaceFileIds": {
                     "20": [123] // number instead of string
diff --git a/crates/core/src/file_group/log_file/log_block.rs 
b/crates/core/src/file_group/log_file/log_block.rs
new file mode 100644
index 0000000..387b709
--- /dev/null
+++ b/crates/core/src/file_group/log_file/log_block.rs
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::error::CoreError;
+use crate::file_group::log_file::log_format::LogFormatVersion;
+use crate::Result;
+use arrow_array::RecordBatch;
+use bytes::Bytes;
+use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
+use std::collections::HashMap;
+use std::str::FromStr;
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+#[allow(dead_code)]
+pub enum BlockType {
+    Command = 0,
+    Delete = 1,
+    Corrupted = 2,
+    AvroData = 3,
+    HfileData = 4,
+    ParquetData = 5,
+    CdcData = 6,
+}
+
+impl AsRef<str> for BlockType {
+    fn as_ref(&self) -> &str {
+        match self {
+            BlockType::Command => ":command",
+            BlockType::Delete => ":delete",
+            BlockType::Corrupted => ":corrupted",
+            BlockType::AvroData => "avro",
+            BlockType::HfileData => "hfile",
+            BlockType::ParquetData => "parquet",
+            BlockType::CdcData => "cdc",
+        }
+    }
+}
+
+impl FromStr for BlockType {
+    type Err = CoreError;
+
+    fn from_str(s: &str) -> Result<Self, Self::Err> {
+        match s {
+            ":command" => Ok(BlockType::Command),
+            ":delete" => Ok(BlockType::Delete),
+            ":corrupted" => Ok(BlockType::Corrupted),
+            "avro_data" => Ok(BlockType::AvroData),
+            "hfile" => Ok(BlockType::HfileData),
+            "parquet" => Ok(BlockType::ParquetData),
+            "cdc" => Ok(BlockType::CdcData),
+            _ => Err(CoreError::LogFormatError(format!(
+                "Invalid block type: {s}"
+            ))),
+        }
+    }
+}
+
+impl TryFrom<[u8; 4]> for BlockType {
+    type Error = CoreError;
+
+    fn try_from(value_bytes: [u8; 4]) -> Result<Self, Self::Error> {
+        let value = u32::from_be_bytes(value_bytes);
+        match value {
+            0 => Ok(BlockType::Command),
+            1 => Ok(BlockType::Delete),
+            2 => Ok(BlockType::Corrupted),
+            3 => Ok(BlockType::AvroData),
+            4 => Ok(BlockType::HfileData),
+            5 => Ok(BlockType::ParquetData),
+            6 => Ok(BlockType::CdcData),
+            _ => Err(CoreError::LogFormatError(format!(
+                "Invalid block type: {value}"
+            ))),
+        }
+    }
+}
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum BlockMetadataType {
+    Header,
+    Footer,
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+#[repr(u32)]
+pub enum BlockMetadataKey {
+    InstantTime = 0,
+    TargetInstantTime = 1,
+    Schema = 2,
+    CommandBlockType = 3,
+    CompactedBlockTimes = 4,
+    RecordPositions = 5,
+    BlockIdentifier = 6,
+}
+
+impl TryFrom<[u8; 4]> for BlockMetadataKey {
+    type Error = CoreError;
+
+    fn try_from(value_bytes: [u8; 4]) -> Result<Self, Self::Error> {
+        let value = u32::from_be_bytes(value_bytes);
+        match value {
+            0 => Ok(Self::InstantTime),
+            1 => Ok(Self::TargetInstantTime),
+            2 => Ok(Self::Schema),
+            3 => Ok(Self::CommandBlockType),
+            4 => Ok(Self::CompactedBlockTimes),
+            5 => Ok(Self::RecordPositions),
+            6 => Ok(Self::BlockIdentifier),
+            _ => Err(CoreError::LogFormatError(format!(
+                "Invalid header key: {value}"
+            ))),
+        }
+    }
+}
+
+#[derive(Debug, Clone)]
+pub struct LogBlock {
+    pub format_version: LogFormatVersion,
+    pub block_type: BlockType,
+    pub header: HashMap<BlockMetadataKey, String>,
+    pub record_batches: Vec<RecordBatch>,
+    pub footer: HashMap<BlockMetadataKey, String>,
+}
+
+impl LogBlock {
+    pub fn decode_content(block_type: &BlockType, content: Vec<u8>) -> 
Result<Vec<RecordBatch>> {
+        match block_type {
+            BlockType::ParquetData => {
+                let record_bytes = Bytes::from(content);
+                let parquet_reader = 
ParquetRecordBatchReader::try_new(record_bytes, 1024)?;
+                let mut batches = Vec::new();
+                for item in parquet_reader {
+                    let batch = item.map_err(CoreError::ArrowError)?;
+                    batches.push(batch);
+                }
+                Ok(batches)
+            }
+            _ => Err(CoreError::LogBlockError(format!(
+                "Unsupported block type: {block_type:?}"
+            ))),
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use arrow_array::{ArrayRef, Int64Array, StringArray};
+    use arrow_schema::{DataType, Field, Schema};
+    use parquet::arrow::ArrowWriter;
+    use std::sync::Arc;
+
+    #[test]
+    fn test_block_type_as_ref() {
+        assert_eq!(BlockType::Command.as_ref(), ":command");
+        assert_eq!(BlockType::Delete.as_ref(), ":delete");
+        assert_eq!(BlockType::Corrupted.as_ref(), ":corrupted");
+        assert_eq!(BlockType::AvroData.as_ref(), "avro");
+        assert_eq!(BlockType::HfileData.as_ref(), "hfile");
+        assert_eq!(BlockType::ParquetData.as_ref(), "parquet");
+        assert_eq!(BlockType::CdcData.as_ref(), "cdc");
+    }
+
+    #[test]
+    fn test_block_type_from_str() {
+        assert_eq!(BlockType::from_str(":command").unwrap(), 
BlockType::Command);
+        assert_eq!(BlockType::from_str(":delete").unwrap(), BlockType::Delete);
+        assert_eq!(
+            BlockType::from_str(":corrupted").unwrap(),
+            BlockType::Corrupted
+        );
+        assert_eq!(
+            BlockType::from_str("avro_data").unwrap(),
+            BlockType::AvroData
+        );
+        assert_eq!(BlockType::from_str("hfile").unwrap(), 
BlockType::HfileData);
+        assert_eq!(
+            BlockType::from_str("parquet").unwrap(),
+            BlockType::ParquetData
+        );
+        assert_eq!(BlockType::from_str("cdc").unwrap(), BlockType::CdcData);
+
+        // Test invalid block type
+        assert!(BlockType::from_str("invalid").is_err());
+    }
+
+    #[test]
+    fn test_block_type_try_from_bytes() {
+        assert_eq!(
+            BlockType::try_from([0, 0, 0, 0]).unwrap(),
+            BlockType::Command
+        );
+        assert_eq!(
+            BlockType::try_from([0, 0, 0, 1]).unwrap(),
+            BlockType::Delete
+        );
+        assert_eq!(
+            BlockType::try_from([0, 0, 0, 2]).unwrap(),
+            BlockType::Corrupted
+        );
+        assert_eq!(
+            BlockType::try_from([0, 0, 0, 3]).unwrap(),
+            BlockType::AvroData
+        );
+        assert_eq!(
+            BlockType::try_from([0, 0, 0, 4]).unwrap(),
+            BlockType::HfileData
+        );
+        assert_eq!(
+            BlockType::try_from([0, 0, 0, 5]).unwrap(),
+            BlockType::ParquetData
+        );
+        assert_eq!(
+            BlockType::try_from([0, 0, 0, 6]).unwrap(),
+            BlockType::CdcData
+        );
+
+        // Test invalid block type
+        assert!(BlockType::try_from([0, 0, 0, 7]).is_err());
+    }
+
+    #[test]
+    fn test_block_metadata_key_try_from_bytes() {
+        assert_eq!(
+            BlockMetadataKey::try_from([0, 0, 0, 0]).unwrap(),
+            BlockMetadataKey::InstantTime
+        );
+        assert_eq!(
+            BlockMetadataKey::try_from([0, 0, 0, 1]).unwrap(),
+            BlockMetadataKey::TargetInstantTime
+        );
+        assert_eq!(
+            BlockMetadataKey::try_from([0, 0, 0, 2]).unwrap(),
+            BlockMetadataKey::Schema
+        );
+        assert_eq!(
+            BlockMetadataKey::try_from([0, 0, 0, 3]).unwrap(),
+            BlockMetadataKey::CommandBlockType
+        );
+        assert_eq!(
+            BlockMetadataKey::try_from([0, 0, 0, 4]).unwrap(),
+            BlockMetadataKey::CompactedBlockTimes
+        );
+        assert_eq!(
+            BlockMetadataKey::try_from([0, 0, 0, 5]).unwrap(),
+            BlockMetadataKey::RecordPositions
+        );
+        assert_eq!(
+            BlockMetadataKey::try_from([0, 0, 0, 6]).unwrap(),
+            BlockMetadataKey::BlockIdentifier
+        );
+
+        // Test invalid metadata key
+        assert!(BlockMetadataKey::try_from([0, 0, 0, 7]).is_err());
+    }
+
+    #[test]
+    fn test_decode_parquet_content() -> Result<()> {
+        // Create sample parquet bytes
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("id", DataType::Int64, false),
+            Field::new("name", DataType::Utf8, false),
+        ]));
+
+        let ids = Int64Array::from(vec![1, 2, 3]);
+        let names = StringArray::from(vec!["a", "b", "c"]);
+
+        let batch = RecordBatch::try_new(
+            schema.clone(),
+            vec![Arc::new(ids) as ArrayRef, Arc::new(names) as ArrayRef],
+        )?;
+
+        let mut buf = Vec::new();
+        {
+            let mut writer = ArrowWriter::try_new(&mut buf, schema, None)?;
+            writer.write(&batch)?;
+            writer.close()?;
+        }
+
+        // Test decoding the parquet content
+        let batches = LogBlock::decode_content(&BlockType::ParquetData, buf)?;
+        assert_eq!(batches.len(), 1);
+        assert_eq!(batches[0].num_rows(), 3);
+
+        // Test decoding with unsupported block type
+        assert!(LogBlock::decode_content(&BlockType::AvroData, 
vec![]).is_err());
+
+        Ok(())
+    }
+}
diff --git a/crates/core/src/file_group/log_file/log_format.rs 
b/crates/core/src/file_group/log_file/log_format.rs
new file mode 100644
index 0000000..290d668
--- /dev/null
+++ b/crates/core/src/file_group/log_file/log_format.rs
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use crate::error::CoreError;
+use crate::Result;
+
+pub const MAGIC: &[u8] = b"#HUDI#";
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+#[repr(u32)]
+pub enum LogFormatVersion {
+    V0 = 0,
+    V1 = 1,
+    V2 = 2,
+    V3 = 3,
+}
+
+impl TryFrom<[u8; 4]> for LogFormatVersion {
+    type Error = CoreError;
+
+    fn try_from(value_bytes: [u8; 4]) -> Result<Self, Self::Error> {
+        let value = u32::from_be_bytes(value_bytes);
+        Self::try_from(value)
+    }
+}
+
+impl TryFrom<u32> for LogFormatVersion {
+    type Error = CoreError;
+
+    fn try_from(value: u32) -> std::result::Result<Self, Self::Error> {
+        match value {
+            0 => Ok(Self::V0),
+            1 => Ok(Self::V1),
+            2 => Ok(Self::V2),
+            3 => Ok(Self::V3),
+            _ => Err(CoreError::LogFormatError(format!(
+                "Invalid log format version: {value}"
+            ))),
+        }
+    }
+}
+
+impl LogFormatVersion {
+    #[inline]
+    pub fn has_block_type(&self) -> bool {
+        !matches!(self, LogFormatVersion::V0)
+    }
+
+    #[inline]
+    pub fn has_header(&self) -> bool {
+        !matches!(self, LogFormatVersion::V0)
+    }
+
+    #[inline]
+    pub fn has_content_length(&self) -> bool {
+        !matches!(self, LogFormatVersion::V0)
+    }
+
+    #[inline]
+    pub fn has_footer(&self) -> bool {
+        matches!(self, LogFormatVersion::V1)
+    }
+
+    #[inline]
+    pub fn has_total_log_block_length(&self) -> bool {
+        matches!(self, LogFormatVersion::V1)
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_log_format_version_from_bytes() {
+        // Test valid versions
+        assert_eq!(
+            LogFormatVersion::try_from([0, 0, 0, 0]).unwrap(),
+            LogFormatVersion::V0
+        );
+        assert_eq!(
+            LogFormatVersion::try_from([0, 0, 0, 1]).unwrap(),
+            LogFormatVersion::V1
+        );
+        assert_eq!(
+            LogFormatVersion::try_from([0, 0, 0, 2]).unwrap(),
+            LogFormatVersion::V2
+        );
+        assert_eq!(
+            LogFormatVersion::try_from([0, 0, 0, 3]).unwrap(),
+            LogFormatVersion::V3
+        );
+
+        // Test invalid version
+        let err = LogFormatVersion::try_from([0, 0, 0, 4]).unwrap_err();
+        assert!(matches!(err, CoreError::LogFormatError(_)));
+        assert!(err.to_string().contains("Invalid log format version: 4"));
+    }
+
+    #[test]
+    fn test_log_format_version_from_u32() {
+        // Test valid versions
+        assert_eq!(
+            LogFormatVersion::try_from(0u32).unwrap(),
+            LogFormatVersion::V0
+        );
+        assert_eq!(
+            LogFormatVersion::try_from(1u32).unwrap(),
+            LogFormatVersion::V1
+        );
+        assert_eq!(
+            LogFormatVersion::try_from(2u32).unwrap(),
+            LogFormatVersion::V2
+        );
+        assert_eq!(
+            LogFormatVersion::try_from(3u32).unwrap(),
+            LogFormatVersion::V3
+        );
+
+        // Test invalid version
+        let err = LogFormatVersion::try_from(4u32).unwrap_err();
+        assert!(matches!(err, CoreError::LogFormatError(_)));
+        assert!(err.to_string().contains("Invalid log format version: 4"));
+    }
+
+    #[test]
+    fn test_version_feature_flags_v0() {
+        let version = LogFormatVersion::V0;
+        assert!(!version.has_block_type());
+        assert!(!version.has_header());
+        assert!(!version.has_content_length());
+        assert!(!version.has_footer());
+        assert!(!version.has_total_log_block_length());
+    }
+
+    #[test]
+    fn test_version_feature_flags_v1() {
+        let version = LogFormatVersion::V1;
+        assert!(version.has_block_type());
+        assert!(version.has_header());
+        assert!(version.has_content_length());
+        assert!(version.has_footer());
+        assert!(version.has_total_log_block_length());
+    }
+
+    #[test]
+    fn test_version_feature_flags_v2() {
+        let version = LogFormatVersion::V2;
+        assert!(version.has_block_type());
+        assert!(version.has_header());
+        assert!(version.has_content_length());
+        assert!(!version.has_footer());
+        assert!(!version.has_total_log_block_length());
+    }
+
+    #[test]
+    fn test_version_feature_flags_v3() {
+        let version = LogFormatVersion::V3;
+        assert!(version.has_block_type());
+        assert!(version.has_header());
+        assert!(version.has_content_length());
+        assert!(!version.has_footer());
+        assert!(!version.has_total_log_block_length());
+    }
+}
diff --git a/crates/core/src/file_group/log_file/mod.rs 
b/crates/core/src/file_group/log_file/mod.rs
new file mode 100644
index 0000000..46dd403
--- /dev/null
+++ b/crates/core/src/file_group/log_file/mod.rs
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use crate::error::CoreError;
+use crate::Result;
+use std::str::FromStr;
+
+mod log_block;
+mod log_format;
+pub mod reader;
+
+#[derive(Clone, Debug, PartialEq, Eq)]
+pub struct LogFile {
+    file_id: String,
+    base_commit_timestamp: String,
+    log_file_extension: String,
+    log_file_version: String,
+    file_write_token: String,
+}
+
+const LOG_FILE_PREFIX: char = '.';
+
+impl FromStr for LogFile {
+    type Err = CoreError;
+
+    /// Parse a log file name into a [LogFile].
+    ///
+    /// File name format:
+    ///
+    /// ```text
+    /// .[File Id]_[Base Commit Timestamp].[Log File Extension].[Log File 
Version]_[File Write Token]
+    /// ```
+    fn from_str(file_name: &str) -> Result<Self, Self::Err> {
+        let err_msg = format!("Failed to parse file name '{file_name}' for log 
file.");
+
+        if !file_name.starts_with(LOG_FILE_PREFIX) {
+            return Err(CoreError::FileGroup(err_msg));
+        }
+
+        let file_name = &file_name[LOG_FILE_PREFIX.len_utf8()..];
+
+        let (file_id, rest) = file_name
+            .split_once('_')
+            .ok_or_else(|| CoreError::FileGroup(err_msg.clone()))?;
+
+        let (middle, file_write_token) = rest
+            .rsplit_once('_')
+            .ok_or_else(|| CoreError::FileGroup(err_msg.clone()))?;
+
+        let parts: Vec<&str> = middle.split('.').collect();
+        if parts.len() != 3 {
+            return Err(CoreError::FileGroup(err_msg.clone()));
+        }
+
+        let base_commit_timestamp = parts[0];
+        let log_file_extension = parts[1];
+        let log_file_version = parts[2];
+
+        if file_id.is_empty()
+            || base_commit_timestamp.is_empty()
+            || log_file_extension.is_empty()
+            || log_file_version.is_empty()
+            || file_write_token.is_empty()
+        {
+            return Err(CoreError::FileGroup(err_msg.clone()));
+        }
+
+        Ok(LogFile {
+            file_id: file_id.to_string(),
+            base_commit_timestamp: base_commit_timestamp.to_string(),
+            log_file_extension: log_file_extension.to_string(),
+            log_file_version: log_file_version.to_string(),
+            file_write_token: file_write_token.to_string(),
+        })
+    }
+}
+
+impl LogFile {
+    /// Returns the file name of the log file.
+    #[inline]
+    pub fn file_name(&self) -> String {
+        format!(
+            
"{prefix}{file_id}_{base_commit_timestamp}.{log_file_extension}.{log_file_version}_{file_write_token}",
+            prefix = LOG_FILE_PREFIX,
+            file_id = self.file_id,
+            base_commit_timestamp = self.base_commit_timestamp,
+            log_file_extension = self.log_file_extension,
+            log_file_version = self.log_file_version,
+            file_write_token = self.file_write_token
+        )
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_valid_filename_parsing() {
+        let filename = 
".54e9a5e9-ee5d-4ed2-acee-720b5810d380-0_20250109233025121.log.1_0-51-115";
+        let log_file = LogFile::from_str(filename).unwrap();
+
+        assert_eq!(log_file.file_id, "54e9a5e9-ee5d-4ed2-acee-720b5810d380-0");
+        assert_eq!(log_file.base_commit_timestamp, "20250109233025121");
+        assert_eq!(log_file.log_file_extension, "log");
+        assert_eq!(log_file.log_file_version, "1");
+        assert_eq!(log_file.file_write_token, "0-51-115");
+    }
+
+    #[test]
+    fn test_filename_reconstruction() {
+        let original = 
".54e9a5e9-ee5d-4ed2-acee-720b5810d380-0_20250109233025121.log.1_0-51-115";
+        let log_file = LogFile::from_str(original).unwrap();
+        assert_eq!(log_file.file_name(), original);
+    }
+
+    #[test]
+    fn test_missing_dot_prefix() {
+        let filename = "myfile_20250109233025121.log.v1_abc123";
+        assert!(matches!(
+            LogFile::from_str(filename),
+            Err(CoreError::FileGroup(_))
+        ));
+    }
+
+    #[test]
+    fn test_missing_first_underscore() {
+        let filename = ".myfile20250109233025121.log.v1_abc123";
+        assert!(matches!(
+            LogFile::from_str(filename),
+            Err(CoreError::FileGroup(_))
+        ));
+    }
+
+    #[test]
+    fn test_missing_last_underscore() {
+        let filename = ".myfile_20250109233025121.log.v1abc123";
+        assert!(matches!(
+            LogFile::from_str(filename),
+            Err(CoreError::FileGroup(_))
+        ));
+    }
+
+    #[test]
+    fn test_incorrect_dot_parts() {
+        let filename = ".myfile_20250109233025121.log.v1.extra_abc123";
+        assert!(matches!(
+            LogFile::from_str(filename),
+            Err(CoreError::FileGroup(_))
+        ));
+    }
+
+    #[test]
+    fn test_empty_components() {
+        let filenames = vec![
+            "._20250109233025121.log.v1_abc123",     // empty file_id
+            ".myfile_.log.v1_abc123",                // empty timestamp
+            ".myfile_20250109233025121..v1_abc123",  // empty extension
+            ".myfile_20250109233025121.log._abc123", // empty version
+            ".myfile_20250109233025121.log.v1_",     // empty token
+        ];
+
+        for filename in filenames {
+            assert!(matches!(
+                LogFile::from_str(filename),
+                Err(CoreError::FileGroup(_))
+            ));
+        }
+    }
+}
diff --git a/crates/core/src/file_group/log_file/reader.rs 
b/crates/core/src/file_group/log_file/reader.rs
new file mode 100644
index 0000000..84b848b
--- /dev/null
+++ b/crates/core/src/file_group/log_file/reader.rs
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::error::CoreError;
+use crate::file_group::log_file::log_block::{
+    BlockMetadataKey, BlockMetadataType, BlockType, LogBlock,
+};
+use crate::file_group::log_file::log_format::{LogFormatVersion, MAGIC};
+use crate::storage::reader::StorageReader;
+use crate::storage::Storage;
+use crate::Result;
+use bytes::BytesMut;
+use std::collections::HashMap;
+use std::io::{self, Read, Seek};
+use std::sync::Arc;
+
+pub const DEFAULT_BUFFER_SIZE: usize = 16 * 1024 * 1024;
+
+#[allow(dead_code)]
+#[derive(Debug)]
+pub struct LogFileReader<R> {
+    storage: Arc<Storage>,
+    reader: R,
+    buffer: BytesMut,
+}
+
+impl LogFileReader<StorageReader> {
+    pub async fn new(storage: Arc<Storage>, relative_path: &str) -> 
Result<Self> {
+        let reader = storage.get_storage_reader(relative_path).await?;
+        Ok(Self {
+            storage,
+            reader,
+            buffer: BytesMut::with_capacity(DEFAULT_BUFFER_SIZE),
+        })
+    }
+
+    pub fn read_all_blocks(mut self) -> Result<Vec<LogBlock>> {
+        let mut blocks = Vec::new();
+        while let Some(block) = self.read_next_block()? {
+            blocks.push(block);
+        }
+        Ok(blocks)
+    }
+}
+
+impl<R: Read + Seek> LogFileReader<R> {
+    /// Read [`MAGIC`] from the log file.
+    ///
+    /// Returns `Ok(true)` if the magic bytes are read successfully.
+    ///
+    /// Returns `Ok(false)` if the end of the file is reached.
+    ///
+    /// Returns an error if the magic bytes are invalid or an I/O error occurs.
+    fn read_magic(&mut self) -> Result<bool> {
+        let mut magic = [0u8; 6];
+        match self.reader.read_exact(&mut magic) {
+            Ok(_) => {
+                if magic != MAGIC {
+                    return Err(CoreError::LogFormatError("Invalid 
magic".to_string()));
+                }
+                Ok(true)
+            }
+            Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => Ok(false),
+            Err(e) => Err(CoreError::ReadLogFileError(e)),
+        }
+    }
+
+    /// Read 8 bytes for the log block's length excluding the magic.
+    fn read_block_length(&mut self) -> Result<u64> {
+        let mut size_buf = [0u8; 8];
+        self.reader.read_exact(&mut size_buf)?;
+        Ok(u64::from_be_bytes(size_buf))
+    }
+
+    fn create_corrupted_block_if_needed(
+        &mut self,
+        _curent_pos: u64,
+        _block_length: Option<u64>,
+    ) -> Option<LogBlock> {
+        // TODO: support creating corrupted block
+        None
+    }
+
+    fn read_block_length_or_corrupted_block(
+        &mut self,
+        start_pos: u64,
+    ) -> Result<(u64, Option<LogBlock>)> {
+        match self.read_block_length() {
+            Ok(length) => {
+                if let Some(block) = 
self.create_corrupted_block_if_needed(start_pos, Some(length))
+                {
+                    Ok((0, Some(block)))
+                } else {
+                    Ok((length, None))
+                }
+            }
+            Err(e) => Err(e),
+        }
+    }
+
+    /// Read 4 bytes for [`LogFormatVersion`].
+    fn read_format_version(&mut self) -> Result<LogFormatVersion> {
+        let mut version_buf = [0u8; 4];
+        self.reader.read_exact(&mut version_buf)?;
+        LogFormatVersion::try_from(version_buf)
+    }
+
+    /// Read 4 bytes for [`BlockType`].
+    fn read_block_type(&mut self, format_version: &LogFormatVersion) -> 
Result<BlockType> {
+        if !format_version.has_block_type() {
+            return Err(CoreError::LogFormatError(
+                "Block type is not supported".to_string(),
+            ));
+        }
+        let mut type_buf = [0u8; 4];
+        self.reader.read_exact(&mut type_buf)?;
+        BlockType::try_from(type_buf)
+    }
+
+    /// First, read 4 bytes for the number of entries in the metadata section 
(header or footer).
+    ///
+    /// Then for each entry,
+    /// 1. Read 4 bytes for the key
+    /// 2. Read 4 bytes for the value's length
+    /// 3. Read the bytes of the length for the value
+    ///
+    /// See also [`BlockMetadataKey`].
+    fn read_block_metadata(
+        &mut self,
+        metadata_type: BlockMetadataType,
+        format_version: &LogFormatVersion,
+    ) -> Result<HashMap<BlockMetadataKey, String>> {
+        match metadata_type {
+            BlockMetadataType::Header if !format_version.has_header() => {
+                return Ok(HashMap::new());
+            }
+            BlockMetadataType::Footer if !format_version.has_footer() => {
+                return Ok(HashMap::new());
+            }
+            _ => {}
+        }
+        let mut num_entries_buf = [0u8; 4];
+        self.reader.read_exact(&mut num_entries_buf)?;
+        let num_entries = u32::from_be_bytes(num_entries_buf);
+        let mut metadata: HashMap<BlockMetadataKey, String> =
+            HashMap::with_capacity(num_entries as usize);
+        for _ in 0..num_entries {
+            let mut key_buf = [0u8; 4];
+            self.reader.read_exact(&mut key_buf)?;
+            let key = BlockMetadataKey::try_from(key_buf)?;
+            let mut value_len_buf = [0u8; 4];
+            self.reader.read_exact(&mut value_len_buf)?;
+            let value_len = u32::from_be_bytes(value_len_buf);
+            let mut value_buf = vec![0u8; value_len as usize];
+            self.reader.read_exact(&mut value_buf)?;
+            let value =
+                String::from_utf8(value_buf).map_err(|e| 
CoreError::Utf8Error(e.utf8_error()))?;
+            metadata.insert(key, value);
+        }
+        Ok(metadata)
+    }
+
+    /// Read the content of the log block.
+    fn read_content(
+        &mut self,
+        format_version: &LogFormatVersion,
+        fallback_length: u64,
+    ) -> Result<Vec<u8>> {
+        let content_length = if format_version.has_content_length() {
+            let mut content_length_buf = [0u8; 8];
+            self.reader.read_exact(&mut content_length_buf)?;
+            u64::from_be_bytes(content_length_buf)
+        } else {
+            fallback_length
+        };
+        let mut content_buf = vec![0u8; content_length as usize];
+        self.reader.read_exact(&mut content_buf)?;
+        Ok(content_buf)
+    }
+
+    /// Read 8 bytes for the total length of the log block.
+    fn read_total_block_length(
+        &mut self,
+        format_version: &LogFormatVersion,
+    ) -> Result<Option<u64>> {
+        if !format_version.has_total_log_block_length() {
+            return Ok(None);
+        }
+        let mut size_buf = [0u8; 8];
+        self.reader.read_exact(&mut size_buf)?;
+        Ok(Some(u64::from_be_bytes(size_buf)))
+    }
+
+    fn read_next_block(&mut self) -> Result<Option<LogBlock>> {
+        if !self.read_magic()? {
+            return Ok(None);
+        }
+
+        let curr_pos = self
+            .reader
+            .stream_position()
+            .map_err(CoreError::ReadLogFileError)?;
+
+        let (block_length, _) = 
self.read_block_length_or_corrupted_block(curr_pos)?;
+        let format_version = self.read_format_version()?;
+        let block_type = self.read_block_type(&format_version)?;
+        let header = self.read_block_metadata(BlockMetadataType::Header, 
&format_version)?;
+        let content = self.read_content(&format_version, block_length)?;
+        let record_batches = LogBlock::decode_content(&block_type, content)?;
+        let footer = self.read_block_metadata(BlockMetadataType::Footer, 
&format_version)?;
+        let _ = self.read_total_block_length(&format_version)?;
+
+        Ok(Some(LogBlock {
+            format_version,
+            block_type,
+            header,
+            record_batches,
+            footer,
+        }))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use std::fs::canonicalize;
+    use std::path::PathBuf;
+    use url::Url;
+
+    fn get_sample_log_file() -> (PathBuf, String) {
+        let dir = PathBuf::from("tests/data/log_files/valid_log_parquet");
+        (
+            canonicalize(dir).unwrap(),
+            
".ee2ace10-7667-40f5-9848-0a144b5ea064-0_20250113230302428.log.1_0-188-387".to_string(),
+        )
+    }
+
+    #[tokio::test]
+    async fn test_read_sample_log_file() {
+        let (dir, file_name) = get_sample_log_file();
+        let dir_url = Url::from_directory_path(dir).unwrap();
+        let storage = Storage::new_with_base_url(dir_url).unwrap();
+        let reader = LogFileReader::new(storage, &file_name).await.unwrap();
+        let blocks = reader.read_all_blocks().unwrap();
+        assert_eq!(blocks.len(), 1);
+
+        let block = &blocks[0];
+        assert_eq!(block.format_version, LogFormatVersion::V1);
+        assert_eq!(block.block_type, BlockType::ParquetData);
+        assert_eq!(block.header.len(), 2);
+        assert_eq!(
+            block.header.get(&BlockMetadataKey::InstantTime).unwrap(),
+            "20250113230424191"
+        );
+        assert!(block.header.contains_key(&BlockMetadataKey::Schema));
+        assert_eq!(block.footer.len(), 0);
+
+        let batches = block.record_batches.as_slice();
+        assert_eq!(batches.len(), 1);
+        assert_eq!(batches[0].num_rows(), 1);
+    }
+}
diff --git a/crates/core/src/file_group/mod.rs 
b/crates/core/src/file_group/mod.rs
index 8c92f4e..fce9786 100644
--- a/crates/core/src/file_group/mod.rs
+++ b/crates/core/src/file_group/mod.rs
@@ -22,6 +22,7 @@
 
 pub mod base_file;
 pub mod builder;
+pub mod log_file;
 pub mod reader;
 
 use crate::error::CoreError;
@@ -65,8 +66,8 @@ impl FileSlice {
 
     /// Returns the enclosing [FileGroup]'s id.
     #[inline]
-    pub fn file_group_id(&self) -> &str {
-        &self.base_file.file_group_id
+    pub fn file_id(&self) -> &str {
+        &self.base_file.file_id
     }
 
     /// Returns the partition path of the [FileSlice].
@@ -101,14 +102,14 @@ impl FileSlice {
 /// Hudi File Group.
 #[derive(Clone, Debug)]
 pub struct FileGroup {
-    pub id: String,
+    pub file_id: String,
     pub partition_path: Option<String>,
     pub file_slices: BTreeMap<String, FileSlice>,
 }
 
 impl PartialEq for FileGroup {
     fn eq(&self, other: &Self) -> bool {
-        self.id == other.id && self.partition_path == other.partition_path
+        self.file_id == other.file_id && self.partition_path == 
other.partition_path
     }
 }
 
@@ -116,7 +117,7 @@ impl Eq for FileGroup {}
 
 impl Hash for FileGroup {
     fn hash<H: Hasher>(&self, state: &mut H) {
-        self.id.hash(state);
+        self.file_id.hash(state);
         self.partition_path.hash(state);
     }
 }
@@ -126,7 +127,7 @@ impl fmt::Display for FileGroup {
         f.write_str(
             format!(
                 "File Group: partition {:?} id {}",
-                &self.partition_path, &self.id
+                &self.partition_path, &self.file_id
             )
             .as_str(),
         )
@@ -134,9 +135,9 @@ impl fmt::Display for FileGroup {
 }
 
 impl FileGroup {
-    pub fn new(id: String, partition_path: Option<String>) -> Self {
+    pub fn new(file_id: String, partition_path: Option<String>) -> Self {
         Self {
-            id,
+            file_id,
             partition_path,
             file_slices: BTreeMap::new(),
         }
@@ -162,7 +163,7 @@ impl FileGroup {
         if self.file_slices.contains_key(instant_time) {
             Err(CoreError::FileGroup(format!(
                 "Instant time {instant_time} is already present in File Group 
{}",
-                self.id
+                self.file_id
             )))
         } else {
             self.file_slices.insert(
@@ -236,7 +237,7 @@ mod tests {
     #[test]
     fn test_file_group_display() {
         let file_group = FileGroup {
-            id: "group123".to_string(),
+            file_id: "group123".to_string(),
             partition_path: Some("part/2023-01-01".to_string()),
             file_slices: BTreeMap::new(),
         };
@@ -249,7 +250,7 @@ mod tests {
         );
 
         let file_group_no_partition = FileGroup {
-            id: "group456".to_string(),
+            file_id: "group456".to_string(),
             partition_path: None,
             file_slices: BTreeMap::new(),
         };
diff --git a/crates/core/src/storage/error.rs b/crates/core/src/storage/error.rs
index 09c749d..a29dd6a 100644
--- a/crates/core/src/storage/error.rs
+++ b/crates/core/src/storage/error.rs
@@ -37,6 +37,9 @@ pub enum StorageError {
     #[error(transparent)]
     ObjectStorePathError(#[from] object_store::path::Error),
 
+    #[error(transparent)]
+    ReaderError(#[from] std::io::Error),
+
     #[error(transparent)]
     ParquetError(#[from] parquet::errors::ParquetError),
 
diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs
index 4f5aec3..85f83b9 100644
--- a/crates/core/src/storage/mod.rs
+++ b/crates/core/src/storage/mod.rs
@@ -36,13 +36,15 @@ use url::Url;
 
 use crate::config::table::HudiTableConfig;
 use crate::config::HudiConfigs;
-use crate::storage::error::Result;
 use crate::storage::error::StorageError::{Creation, InvalidPath};
+use crate::storage::error::{Result, StorageError};
 use crate::storage::file_metadata::FileMetadata;
+use crate::storage::reader::StorageReader;
 use crate::storage::util::join_url_segments;
 
 pub mod error;
 pub mod file_metadata;
+pub mod reader;
 pub mod util;
 
 #[allow(dead_code)]
@@ -191,6 +193,16 @@ impl Storage {
         Ok(concat_batches(&schema, &batches)?)
     }
 
+    pub async fn get_storage_reader(&self, relative_path: &str) -> 
Result<StorageReader> {
+        let obj_url = join_url_segments(&self.base_url, &[relative_path])?;
+        let obj_path = ObjPath::from_url_path(obj_url.path())?;
+        let obj_store = self.object_store.clone();
+        let obj_meta = obj_store.head(&obj_path).await?;
+        StorageReader::new(obj_store, obj_meta)
+            .await
+            .map_err(StorageError::ReaderError)
+    }
+
     pub async fn list_dirs(&self, subdir: Option<&str>) -> Result<Vec<String>> 
{
         let dir_paths = self.list_dirs_as_obj_paths(subdir).await?;
         let mut dirs = Vec::new();
diff --git a/crates/core/src/storage/reader.rs 
b/crates/core/src/storage/reader.rs
new file mode 100644
index 0000000..b012316
--- /dev/null
+++ b/crates/core/src/storage/reader.rs
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use bytes::Bytes;
+use object_store::{ObjectMeta, ObjectStore};
+use std::io::{BufReader, Cursor, Result};
+use std::io::{Read, Seek, SeekFrom};
+use std::sync::Arc;
+
+#[derive(Debug)]
+pub struct StorageReader {
+    reader: BufReader<Cursor<Bytes>>,
+}
+
+impl StorageReader {
+    pub async fn new(object_store: Arc<dyn ObjectStore>, object_meta: 
ObjectMeta) -> Result<Self> {
+        let get_result = object_store.get(&object_meta.location).await?;
+        // TODO change to use stream
+        let bytes = get_result.bytes().await?;
+        let reader = BufReader::with_capacity(bytes.len(), Cursor::new(bytes));
+        Ok(Self { reader })
+    }
+}
+
+impl Read for StorageReader {
+    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
+        self.reader.read(buf)
+    }
+}
+
+impl Seek for StorageReader {
+    fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
+        self.reader.seek(pos)
+    }
+}
diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs
index 4e6aa1d..455e823 100644
--- a/crates/core/src/table/fs_view.rs
+++ b/crates/core/src/table/fs_view.rs
@@ -102,7 +102,7 @@ impl FileSystemView {
         let mut fg_id_to_base_files: HashMap<String, Vec<BaseFile>> = 
HashMap::new();
         for metadata in file_metadata {
             let base_file = BaseFile::try_from(metadata)?;
-            let fg_id = &base_file.file_group_id;
+            let fg_id = &base_file.file_id;
             fg_id_to_base_files
                 .entry(fg_id.to_owned())
                 .or_default()
@@ -257,7 +257,7 @@ mod tests {
         assert_eq!(file_slices.len(), 1);
         let fg_ids = file_slices
             .iter()
-            .map(|fsl| fsl.file_group_id())
+            .map(|fsl| fsl.file_id())
             .collect::<Vec<_>>();
         assert_eq!(fg_ids, vec!["a079bdb3-731c-4894-b855-abfcd6921007-0"]);
         for fsl in file_slices.iter() {
@@ -286,7 +286,7 @@ mod tests {
         assert_eq!(file_slices.len(), 1);
         let fg_ids = file_slices
             .iter()
-            .map(|fsl| fsl.file_group_id())
+            .map(|fsl| fsl.file_id())
             .collect::<Vec<_>>();
         assert_eq!(fg_ids, vec!["ebcb261d-62d3-4895-90ec-5b3c9622dff4-0"]);
         for fsl in file_slices.iter() {
@@ -327,7 +327,7 @@ mod tests {
 
         let fg_ids = file_slices
             .iter()
-            .map(|fsl| fsl.file_group_id())
+            .map(|fsl| fsl.file_id())
             .collect::<Vec<_>>();
         assert_eq!(fg_ids, vec!["a22e8257-e249-45e9-ba46-115bc85adcba-0"]);
         for fsl in file_slices.iter() {
diff --git 
a/crates/core/tests/data/log_files/valid_log_parquet/.ee2ace10-7667-40f5-9848-0a144b5ea064-0_20250113230302428.log.1_0-188-387
 
b/crates/core/tests/data/log_files/valid_log_parquet/.ee2ace10-7667-40f5-9848-0a144b5ea064-0_20250113230302428.log.1_0-188-387
new file mode 100644
index 0000000..f0f4d14
Binary files /dev/null and 
b/crates/core/tests/data/log_files/valid_log_parquet/.ee2ace10-7667-40f5-9848-0a144b5ea064-0_20250113230302428.log.1_0-188-387
 differ
diff --git a/python/hudi/_internal.pyi b/python/hudi/_internal.pyi
index d5e046f..eb4b110 100644
--- a/python/hudi/_internal.pyi
+++ b/python/hudi/_internal.pyi
@@ -60,7 +60,7 @@ class HudiFileSlice:
     the partition it belongs to, and associated metadata.
 
     Attributes:
-        file_group_id (str): The id of the file group this file slice belongs 
to.
+        file_id (str): The id of the file group this file slice belongs to.
         partition_path (str): The path of the partition containing this file 
slice.
         creation_instant_time (str): The creation instant time of this file 
slice.
         base_file_name (str): The name of the base file.
@@ -69,7 +69,7 @@ class HudiFileSlice:
         num_records (int): The number of records in the file slice.
     """
 
-    file_group_id: str
+    file_id: str
     partition_path: str
     creation_instant_time: str
     base_file_name: str
diff --git a/python/src/internal.rs b/python/src/internal.rs
index 4c9b7fd..e11b21a 100644
--- a/python/src/internal.rs
+++ b/python/src/internal.rs
@@ -90,7 +90,7 @@ impl HudiFileGroupReader {
 #[pyclass]
 pub struct HudiFileSlice {
     #[pyo3(get)]
-    file_group_id: String,
+    file_id: String,
     #[pyo3(get)]
     partition_path: String,
     #[pyo3(get)]
@@ -125,7 +125,7 @@ impl HudiFileSlice {
 
 #[cfg(not(tarpaulin))]
 fn convert_file_slice(f: &FileSlice) -> HudiFileSlice {
-    let file_group_id = f.file_group_id().to_string();
+    let file_id = f.file_id().to_string();
     let partition_path = f.partition_path().to_string();
     let creation_instant_time = f.creation_instant_time().to_string();
     let base_file_name = f.base_file.file_name.clone();
@@ -134,7 +134,7 @@ fn convert_file_slice(f: &FileSlice) -> HudiFileSlice {
     let base_file_byte_size = file_metadata.byte_size;
     let num_records = file_metadata.num_records;
     HudiFileSlice {
-        file_group_id,
+        file_id,
         partition_path,
         creation_instant_time,
         base_file_name,

Reply via email to