This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 3dc5fe2  feat: add APIs for MOR snapshot reads (#247)
3dc5fe2 is described below

commit 3dc5fe239c8038d61567e8085828f98eba1c0584
Author: Shiyan Xu <[email protected]>
AuthorDate: Fri Jan 17 23:59:27 2025 -0600

    feat: add APIs for MOR snapshot reads (#247)
    
    - Make `FileGroup` and `FileSlice` support adding and reading log files
    - Perform snapshot read using `FileGroupReader`, which merges log files 
with base files when required
    - Make `BaseFile` and `LogFile` APIs more ergonomic
---
 crates/core/src/config/table.rs                    |   3 +
 crates/core/src/file_group/base_file.rs            |  88 ++++++---
 crates/core/src/file_group/file_slice.rs           |  97 ++++++++++
 crates/core/src/file_group/log_file/log_block.rs   |   1 +
 crates/core/src/file_group/log_file/mod.rs         | 169 +++++++++++++---
 crates/core/src/file_group/log_file/reader.rs      |  14 +-
 crates/core/src/file_group/mod.rs                  | 123 +++++-------
 crates/core/src/file_group/reader.rs               | 104 ++++++++--
 crates/core/src/storage/mod.rs                     |  13 +-
 crates/core/src/table/builder.rs                   |  13 +-
 crates/core/src/table/fs_view.rs                   | 122 ++++++++----
 crates/core/src/table/mod.rs                       | 215 +++++++++++++++------
 crates/core/src/table/partition.rs                 |   2 +
 crates/core/src/timeline/instant.rs                |  37 ++--
 crates/core/src/timeline/mod.rs                    |  87 ++++++---
 crates/core/src/timeline/selector.rs               |  15 +-
 crates/datafusion/src/lib.rs                       |  24 +--
 .../{ => cow}/v6_complexkeygen_hivestyle.sql       |   0
 .../{ => cow}/v6_complexkeygen_hivestyle.zip       | Bin
 crates/tests/data/tables/{ => cow}/v6_empty.sql    |   0
 crates/tests/data/tables/{ => cow}/v6_empty.zip    | Bin
 .../data/tables/{ => cow}/v6_nonpartitioned.sql    |   0
 .../data/tables/{ => cow}/v6_nonpartitioned.zip    | Bin
 .../v6_simplekeygen_hivestyle_no_metafields.sql    |   0
 .../v6_simplekeygen_hivestyle_no_metafields.zip    | Bin
 .../{ => cow}/v6_simplekeygen_nonhivestyle.sql     |   0
 .../{ => cow}/v6_simplekeygen_nonhivestyle.zip     | Bin
 ...v6_simplekeygen_nonhivestyle_overwritetable.sql |   0
 ...v6_simplekeygen_nonhivestyle_overwritetable.zip | Bin
 .../{ => cow}/v6_timebasedkeygen_nonhivestyle.sql  |   0
 .../{ => cow}/v6_timebasedkeygen_nonhivestyle.zip  | Bin
 .../{ => mor}/v6_complexkeygen_hivestyle.sql       |   9 +-
 .../data/tables/mor/v6_complexkeygen_hivestyle.zip | Bin 0 -> 42137 bytes
 crates/tests/data/tables/{ => mor}/v6_empty.sql    |   2 +-
 crates/tests/data/tables/mor/v6_empty.zip          | Bin 0 -> 5885 bytes
 .../data/tables/{ => mor}/v6_nonpartitioned.sql    |   8 +-
 crates/tests/data/tables/mor/v6_nonpartitioned.zip | Bin 0 -> 28459 bytes
 crates/tests/src/lib.rs                            |  34 ++--
 demo/app/python/src/main.py                        |  60 +++---
 demo/app/rust/src/main.rs                          |   6 +-
 demo/infra/mc/prepare_data.sh                      |   9 +-
 python/src/internal.rs                             |   4 +-
 42 files changed, 887 insertions(+), 372 deletions(-)

diff --git a/crates/core/src/config/table.rs b/crates/core/src/config/table.rs
index 7101971..9915626 100644
--- a/crates/core/src/config/table.rs
+++ b/crates/core/src/config/table.rs
@@ -146,6 +146,9 @@ impl ConfigParser for HudiTableConfig {
 
     fn default_value(&self) -> Option<Self::Output> {
         match self {
+            Self::BaseFileFormat => Some(HudiConfigValue::String(
+                BaseFileFormatValue::Parquet.as_ref().to_string(),
+            )),
             Self::DatabaseName => 
Some(HudiConfigValue::String("default".to_string())),
             Self::DropsPartitionFields => 
Some(HudiConfigValue::Boolean(false)),
             Self::PartitionFields => Some(HudiConfigValue::List(vec![])),
diff --git a/crates/core/src/file_group/base_file.rs 
b/crates/core/src/file_group/base_file.rs
index 7c4fcfb..8d51969 100644
--- a/crates/core/src/file_group/base_file.rs
+++ b/crates/core/src/file_group/base_file.rs
@@ -19,52 +19,85 @@
 use crate::error::CoreError;
 use crate::storage::file_metadata::FileMetadata;
 use crate::Result;
+use std::str::FromStr;
 
 /// Hudi Base file, part of a [FileSlice].
 #[derive(Clone, Debug)]
 pub struct BaseFile {
-    /// The file name of the base file.
-    pub file_name: String,
-
-    /// The id of the enclosing file group.
+    /// The id of the enclosing [FileGroup].
     pub file_id: String,
 
-    /// The associated instant time of the base file.
-    pub instant_time: String,
+    /// Monotonically increasing token for every attempt to write the 
[BaseFile].
+    pub write_token: String,
+
+    /// The timestamp of the commit instant in the Timeline that created the 
[BaseFile].
+    pub commit_timestamp: String,
+
+    /// File extension that matches to 
[crate::config::table::HudiTableConfig::BaseFileFormat].
+    ///
+    /// See also [crate::config::table::BaseFileFormatValue].
+    pub extension: String,
 
     /// The metadata about the file.
     pub file_metadata: Option<FileMetadata>,
 }
 
 impl BaseFile {
-    /// Parse file name and extract `file_id` and `instant_time`.
-    fn parse_file_name(file_name: &str) -> Result<(String, String)> {
+    /// Parse a base file's name into parts.
+    ///
+    /// File name format:
+    ///
+    /// ```text
+    /// [File Id]_[File Write Token]_[Commit timestamp].[File Extension]
+    /// ```
+    fn parse_file_name(file_name: &str) -> Result<(String, String, String, 
String)> {
         let err_msg = format!("Failed to parse file name '{file_name}' for 
base file.");
-        let (name, _) = file_name
+        let (stem, extension) = file_name
             .rsplit_once('.')
             .ok_or_else(|| CoreError::FileGroup(err_msg.clone()))?;
-        let parts: Vec<&str> = name.split('_').collect();
+        let parts: Vec<&str> = stem.split('_').collect();
         let file_id = parts
             .first()
             .ok_or_else(|| CoreError::FileGroup(err_msg.clone()))?
             .to_string();
-        let instant_time = parts
+        let write_token = parts
+            .get(1)
+            .ok_or_else(|| CoreError::FileGroup(err_msg.clone()))?
+            .to_string();
+        let commit_timestamp = parts
             .get(2)
             .ok_or_else(|| CoreError::FileGroup(err_msg.clone()))?
             .to_string();
-        Ok((file_id, instant_time))
+        Ok((
+            file_id,
+            write_token,
+            commit_timestamp,
+            extension.to_string(),
+        ))
+    }
+
+    #[inline]
+    pub fn file_name(&self) -> String {
+        format!(
+            "{file_id}_{write_token}_{commit_timestamp}.{extension}",
+            file_id = self.file_id,
+            write_token = self.write_token,
+            commit_timestamp = self.commit_timestamp,
+            extension = self.extension,
+        )
     }
 }
 
-impl TryFrom<&str> for BaseFile {
-    type Error = CoreError;
+impl FromStr for BaseFile {
+    type Err = CoreError;
 
-    fn try_from(file_name: &str) -> Result<Self> {
-        let (file_id, instant_time) = Self::parse_file_name(file_name)?;
+    fn from_str(file_name: &str) -> Result<Self, Self::Err> {
+        let (file_id, write_token, commit_timestamp, extension) = 
Self::parse_file_name(file_name)?;
         Ok(Self {
-            file_name: file_name.to_string(),
             file_id,
-            instant_time,
+            write_token,
+            commit_timestamp,
+            extension,
             file_metadata: None,
         })
     }
@@ -74,12 +107,13 @@ impl TryFrom<FileMetadata> for BaseFile {
     type Error = CoreError;
 
     fn try_from(metadata: FileMetadata) -> Result<Self> {
-        let file_name = metadata.name.clone();
-        let (file_id, instant_time) = Self::parse_file_name(&file_name)?;
+        let file_name = metadata.name.as_str();
+        let (file_id, write_token, commit_timestamp, extension) = 
Self::parse_file_name(file_name)?;
         Ok(Self {
-            file_name,
             file_id,
-            instant_time,
+            write_token,
+            commit_timestamp,
+            extension,
             file_metadata: Some(metadata),
         })
     }
@@ -93,9 +127,9 @@ mod tests {
     #[test]
     fn test_create_base_file_from_file_name() {
         let file_name = 
"5a226868-2934-4f84-a16f-55124630c68d-0_0-7-24_20240402144910683.parquet";
-        let base_file = BaseFile::try_from(file_name).unwrap();
+        let base_file = BaseFile::from_str(file_name).unwrap();
         assert_eq!(base_file.file_id, 
"5a226868-2934-4f84-a16f-55124630c68d-0");
-        assert_eq!(base_file.instant_time, "20240402144910683");
+        assert_eq!(base_file.commit_timestamp, "20240402144910683");
         assert!(base_file.file_metadata.is_none());
     }
 
@@ -107,7 +141,7 @@ mod tests {
         );
         let base_file = BaseFile::try_from(metadata).unwrap();
         assert_eq!(base_file.file_id, 
"5a226868-2934-4f84-a16f-55124630c68d-0");
-        assert_eq!(base_file.instant_time, "20240402144910683");
+        assert_eq!(base_file.commit_timestamp, "20240402144910683");
         let file_metadata = base_file.file_metadata.unwrap();
         assert_eq!(file_metadata.size, 1024);
         assert_not!(file_metadata.fully_populated);
@@ -115,10 +149,10 @@ mod tests {
 
     #[test]
     fn create_a_base_file_returns_error() {
-        let result = BaseFile::try_from("no_file_extension");
+        let result = BaseFile::from_str("no_file_extension");
         assert!(matches!(result.unwrap_err(), CoreError::FileGroup(_)));
 
-        let result = BaseFile::try_from(".parquet");
+        let result = BaseFile::from_str(".parquet");
         assert!(matches!(result.unwrap_err(), CoreError::FileGroup(_)));
 
         let metadata = FileMetadata::new("no-valid-delimiter.parquet", 1024);
diff --git a/crates/core/src/file_group/file_slice.rs 
b/crates/core/src/file_group/file_slice.rs
new file mode 100644
index 0000000..c8ca359
--- /dev/null
+++ b/crates/core/src/file_group/file_slice.rs
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use crate::error::CoreError;
+use crate::file_group::base_file::BaseFile;
+use crate::file_group::log_file::LogFile;
+use crate::storage::Storage;
+use crate::Result;
+use std::collections::BTreeSet;
+use std::path::PathBuf;
+
+/// Within a [crate::file_group::FileGroup],
+/// a [FileSlice] is a logical group of [BaseFile] and [LogFile]s.
+#[derive(Clone, Debug)]
+pub struct FileSlice {
+    pub base_file: BaseFile,
+    pub log_files: BTreeSet<LogFile>,
+    pub partition_path: Option<String>,
+}
+
+impl FileSlice {
+    pub fn new(base_file: BaseFile, partition_path: Option<String>) -> Self {
+        Self {
+            base_file,
+            log_files: BTreeSet::new(),
+            partition_path,
+        }
+    }
+
+    fn relative_path_for_file(&self, file_name: &str) -> Result<String> {
+        let path = PathBuf::from(self.partition_path()).join(file_name);
+        path.to_str().map(|s| s.to_string()).ok_or_else(|| {
+            CoreError::FileGroup(format!("Failed to get relative path for 
file: {file_name}",))
+        })
+    }
+
+    /// Returns the relative path of the [BaseFile] in the [FileSlice].
+    pub fn base_file_relative_path(&self) -> Result<String> {
+        let file_name = &self.base_file.file_name();
+        self.relative_path_for_file(file_name)
+    }
+
+    /// Returns the relative path of the given [LogFile] in the [FileSlice].
+    pub fn log_file_relative_path(&self, log_file: &LogFile) -> Result<String> 
{
+        let file_name = &log_file.file_name();
+        self.relative_path_for_file(file_name)
+    }
+
+    /// Returns the enclosing [FileGroup]'s id.
+    #[inline]
+    pub fn file_id(&self) -> &str {
+        &self.base_file.file_id
+    }
+
+    /// Returns the partition path of the [FileSlice].
+    #[inline]
+    pub fn partition_path(&self) -> &str {
+        self.partition_path.as_deref().unwrap_or_default()
+    }
+
+    /// Returns the instant time that marks the [FileSlice] creation.
+    ///
+    /// This is also an instant time stored in the [Timeline].
+    #[inline]
+    pub fn creation_instant_time(&self) -> &str {
+        &self.base_file.commit_timestamp
+    }
+
+    /// Load [FileMetadata] from storage layer for the [BaseFile] if 
`file_metadata` is [None]
+    /// or if `file_metadata` is not fully populated.
+    pub async fn load_metadata_if_needed(&mut self, storage: &Storage) -> 
Result<()> {
+        if let Some(metadata) = &self.base_file.file_metadata {
+            if metadata.fully_populated {
+                return Ok(());
+            }
+        }
+        let relative_path = self.base_file_relative_path()?;
+        let fetched_metadata = 
storage.get_file_metadata(&relative_path).await?;
+        self.base_file.file_metadata = Some(fetched_metadata);
+        Ok(())
+    }
+}
diff --git a/crates/core/src/file_group/log_file/log_block.rs 
b/crates/core/src/file_group/log_file/log_block.rs
index 387b709..ab59a40 100644
--- a/crates/core/src/file_group/log_file/log_block.rs
+++ b/crates/core/src/file_group/log_file/log_block.rs
@@ -129,6 +129,7 @@ impl TryFrom<[u8; 4]> for BlockMetadataKey {
     }
 }
 
+#[allow(dead_code)]
 #[derive(Debug, Clone)]
 pub struct LogBlock {
     pub format_version: LogFormatVersion,
diff --git a/crates/core/src/file_group/log_file/mod.rs 
b/crates/core/src/file_group/log_file/mod.rs
index 46dd403..2012861 100644
--- a/crates/core/src/file_group/log_file/mod.rs
+++ b/crates/core/src/file_group/log_file/mod.rs
@@ -17,35 +17,37 @@
  * under the License.
  */
 use crate::error::CoreError;
+use crate::storage::file_metadata::FileMetadata;
 use crate::Result;
+use std::cmp::Ordering;
 use std::str::FromStr;
 
 mod log_block;
 mod log_format;
 pub mod reader;
 
-#[derive(Clone, Debug, PartialEq, Eq)]
+#[derive(Clone, Debug)]
 pub struct LogFile {
-    file_id: String,
-    base_commit_timestamp: String,
-    log_file_extension: String,
-    log_file_version: String,
-    file_write_token: String,
+    pub file_id: String,
+    pub base_commit_timestamp: String,
+    pub extension: String,
+    pub version: String,
+    pub write_token: String,
+    pub file_metadata: Option<FileMetadata>,
 }
 
 const LOG_FILE_PREFIX: char = '.';
 
-impl FromStr for LogFile {
-    type Err = CoreError;
-
-    /// Parse a log file name into a [LogFile].
+impl LogFile {
+    /// Parse a log file's name into parts.
     ///
     /// File name format:
     ///
     /// ```text
     /// .[File Id]_[Base Commit Timestamp].[Log File Extension].[Log File 
Version]_[File Write Token]
     /// ```
-    fn from_str(file_name: &str) -> Result<Self, Self::Err> {
+    /// TODO support `.cdc` suffix
+    fn parse_file_name(file_name: &str) -> Result<(String, String, String, 
String, String)> {
         let err_msg = format!("Failed to parse file name '{file_name}' for log 
file.");
 
         if !file_name.starts_with(LOG_FILE_PREFIX) {
@@ -80,32 +82,90 @@ impl FromStr for LogFile {
             return Err(CoreError::FileGroup(err_msg.clone()));
         }
 
-        Ok(LogFile {
-            file_id: file_id.to_string(),
-            base_commit_timestamp: base_commit_timestamp.to_string(),
-            log_file_extension: log_file_extension.to_string(),
-            log_file_version: log_file_version.to_string(),
-            file_write_token: file_write_token.to_string(),
-        })
+        Ok((
+            file_id.to_string(),
+            base_commit_timestamp.to_string(),
+            log_file_extension.to_string(),
+            log_file_version.to_string(),
+            file_write_token.to_string(),
+        ))
     }
-}
 
-impl LogFile {
-    /// Returns the file name of the log file.
     #[inline]
     pub fn file_name(&self) -> String {
         format!(
-            
"{prefix}{file_id}_{base_commit_timestamp}.{log_file_extension}.{log_file_version}_{file_write_token}",
+            
"{prefix}{file_id}_{base_commit_timestamp}.{extension}.{version}_{write_token}",
             prefix = LOG_FILE_PREFIX,
             file_id = self.file_id,
             base_commit_timestamp = self.base_commit_timestamp,
-            log_file_extension = self.log_file_extension,
-            log_file_version = self.log_file_version,
-            file_write_token = self.file_write_token
+            extension = self.extension,
+            version = self.version,
+            write_token = self.write_token
         )
     }
 }
 
+impl FromStr for LogFile {
+    type Err = CoreError;
+
+    /// Parse a log file name into a [LogFile].
+    fn from_str(file_name: &str) -> Result<Self, Self::Err> {
+        let (file_id, base_commit_timestamp, extension, version, write_token) =
+            Self::parse_file_name(file_name)?;
+        Ok(LogFile {
+            file_id,
+            base_commit_timestamp,
+            extension,
+            version,
+            write_token,
+            file_metadata: None,
+        })
+    }
+}
+
+impl TryFrom<FileMetadata> for LogFile {
+    type Error = CoreError;
+
+    fn try_from(metadata: FileMetadata) -> Result<Self> {
+        let file_name = metadata.name.as_str();
+        let (file_id, base_commit_timestamp, extension, version, write_token) =
+            Self::parse_file_name(file_name)?;
+        Ok(LogFile {
+            file_id,
+            base_commit_timestamp,
+            extension,
+            version,
+            write_token,
+            file_metadata: Some(metadata),
+        })
+    }
+}
+
+impl PartialEq for LogFile {
+    fn eq(&self, other: &Self) -> bool {
+        self.file_name() == other.file_name()
+    }
+}
+
+impl Eq for LogFile {}
+
+impl PartialOrd for LogFile {
+    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+        Some(self.cmp(other))
+    }
+}
+
+impl Ord for LogFile {
+    fn cmp(&self, other: &Self) -> Ordering {
+        // Compare fields in order: base_commit_timestamp, version, write_token
+        // TODO support `.cdc` suffix
+        self.base_commit_timestamp
+            .cmp(&other.base_commit_timestamp)
+            .then(self.version.cmp(&other.version))
+            .then(self.write_token.cmp(&other.write_token))
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
@@ -117,9 +177,9 @@ mod tests {
 
         assert_eq!(log_file.file_id, "54e9a5e9-ee5d-4ed2-acee-720b5810d380-0");
         assert_eq!(log_file.base_commit_timestamp, "20250109233025121");
-        assert_eq!(log_file.log_file_extension, "log");
-        assert_eq!(log_file.log_file_version, "1");
-        assert_eq!(log_file.file_write_token, "0-51-115");
+        assert_eq!(log_file.extension, "log");
+        assert_eq!(log_file.version, "1");
+        assert_eq!(log_file.write_token, "0-51-115");
     }
 
     #[test]
@@ -182,4 +242,57 @@ mod tests {
             ));
         }
     }
+
+    #[test]
+    fn test_log_file_ordering() {
+        // Same timestamp, different version
+        let log1 = LogFile {
+            file_id: "ee2ace10-7667-40f5-9848-0a144b5ea064-0".to_string(),
+            base_commit_timestamp: "20250113230302428".to_string(),
+            extension: "log".to_string(),
+            version: "1".to_string(),
+            write_token: "0-188-387".to_string(),
+            file_metadata: None,
+        };
+
+        let log2 = LogFile {
+            file_id: "ee2ace10-7667-40f5-9848-0a144b5ea064-0".to_string(),
+            base_commit_timestamp: "20250113230302428".to_string(),
+            extension: "log".to_string(),
+            version: "2".to_string(),
+            write_token: "0-188-387".to_string(),
+            file_metadata: None,
+        };
+
+        // Different timestamp
+        let log3 = LogFile {
+            file_id: "ee2ace10-7667-40f5-9848-0a144b5ea064-0".to_string(),
+            base_commit_timestamp: "20250113230424191".to_string(),
+            extension: "log".to_string(),
+            version: "1".to_string(),
+            write_token: "0-188-387".to_string(),
+            file_metadata: None,
+        };
+
+        // Same timestamp and version, different write token
+        let log4 = LogFile {
+            file_id: "ee2ace10-7667-40f5-9848-0a144b5ea064-0".to_string(),
+            base_commit_timestamp: "20250113230302428".to_string(),
+            extension: "log".to_string(),
+            version: "1".to_string(),
+            write_token: "1-188-387".to_string(),
+            file_metadata: None,
+        };
+
+        // Test ordering
+        assert!(log1 < log2, "version ordering failed"); // version comparison
+        assert!(log1 < log3, "timestamp ordering failed"); // timestamp 
comparison
+        assert!(log2 < log3, "timestamp ordering failed"); // timestamp 
comparison
+        assert!(log1 < log4, "write token ordering failed"); // write token 
comparison
+
+        // Test sorting a vector of log files
+        let mut logs = vec![log3.clone(), log4.clone(), log1.clone(), 
log2.clone()];
+        logs.sort();
+        assert_eq!(logs, vec![log1, log4, log2, log3]);
+    }
 }
diff --git a/crates/core/src/file_group/log_file/reader.rs 
b/crates/core/src/file_group/log_file/reader.rs
index 84b848b..8a0d439 100644
--- a/crates/core/src/file_group/log_file/reader.rs
+++ b/crates/core/src/file_group/log_file/reader.rs
@@ -25,6 +25,7 @@ use 
crate::file_group::log_file::log_format::{LogFormatVersion, MAGIC};
 use crate::storage::reader::StorageReader;
 use crate::storage::Storage;
 use crate::Result;
+use arrow_array::RecordBatch;
 use bytes::BytesMut;
 use std::collections::HashMap;
 use std::io::{self, Read, Seek};
@@ -50,13 +51,22 @@ impl LogFileReader<StorageReader> {
         })
     }
 
-    pub fn read_all_blocks(mut self) -> Result<Vec<LogBlock>> {
+    fn read_all_blocks(&mut self) -> Result<Vec<LogBlock>> {
         let mut blocks = Vec::new();
         while let Some(block) = self.read_next_block()? {
             blocks.push(block);
         }
         Ok(blocks)
     }
+
+    pub fn read_all_records_unmerged(&mut self) -> Result<Vec<RecordBatch>> {
+        let all_blocks = self.read_all_blocks()?;
+        let mut batches = Vec::new();
+        for block in all_blocks {
+            batches.extend_from_slice(&block.record_batches);
+        }
+        Ok(batches)
+    }
 }
 
 impl<R: Read + Seek> LogFileReader<R> {
@@ -256,7 +266,7 @@ mod tests {
         let (dir, file_name) = get_sample_log_file();
         let dir_url = Url::from_directory_path(dir).unwrap();
         let storage = Storage::new_with_base_url(dir_url).unwrap();
-        let reader = LogFileReader::new(storage, &file_name).await.unwrap();
+        let mut reader = LogFileReader::new(storage, 
&file_name).await.unwrap();
         let blocks = reader.read_all_blocks().unwrap();
         assert_eq!(blocks.len(), 1);
 
diff --git a/crates/core/src/file_group/mod.rs 
b/crates/core/src/file_group/mod.rs
index fce9786..7d3bcc7 100644
--- a/crates/core/src/file_group/mod.rs
+++ b/crates/core/src/file_group/mod.rs
@@ -22,82 +22,20 @@
 
 pub mod base_file;
 pub mod builder;
+pub mod file_slice;
 pub mod log_file;
 pub mod reader;
 
 use crate::error::CoreError;
 use crate::file_group::base_file::BaseFile;
-use crate::storage::Storage;
+use crate::file_group::log_file::LogFile;
 use crate::Result;
+use file_slice::FileSlice;
 use std::collections::BTreeMap;
 use std::fmt;
 use std::fmt::Formatter;
 use std::hash::{Hash, Hasher};
-use std::path::PathBuf;
-
-/// Within a [FileGroup], a [FileSlice] is a logical group of [BaseFile] and 
log files.
-///
-/// [note] The log files are not yet supported.
-#[derive(Clone, Debug)]
-pub struct FileSlice {
-    pub base_file: BaseFile,
-    pub partition_path: Option<String>,
-}
-
-impl FileSlice {
-    pub fn new(base_file: BaseFile, partition_path: Option<String>) -> Self {
-        Self {
-            base_file,
-            partition_path,
-        }
-    }
-
-    /// Returns the relative path of the base file.
-    pub fn base_file_relative_path(&self) -> Result<String> {
-        let file_name = &self.base_file.file_name;
-        let path = PathBuf::from(self.partition_path()).join(file_name);
-        path.to_str().map(|s| s.to_string()).ok_or_else(|| {
-            CoreError::FileGroup(format!(
-                "Failed to get base file relative path for file slice: {:?}",
-                self
-            ))
-        })
-    }
-
-    /// Returns the enclosing [FileGroup]'s id.
-    #[inline]
-    pub fn file_id(&self) -> &str {
-        &self.base_file.file_id
-    }
-
-    /// Returns the partition path of the [FileSlice].
-    #[inline]
-    pub fn partition_path(&self) -> &str {
-        self.partition_path.as_deref().unwrap_or_default()
-    }
-
-    /// Returns the instant time that marks the [FileSlice] creation.
-    ///
-    /// This is also an instant time stored in the [Timeline].
-    #[inline]
-    pub fn creation_instant_time(&self) -> &str {
-        &self.base_file.instant_time
-    }
-
-    /// Load [FileMetadata] from storage layer for the [BaseFile] if 
`file_metadata` is [None]
-    /// or if `file_metadata` is not fully populated.
-    pub async fn load_metadata_if_needed(&mut self, storage: &Storage) -> 
Result<()> {
-        if let Some(metadata) = &self.base_file.file_metadata {
-            if metadata.fully_populated {
-                return Ok(());
-            }
-        }
-        let relative_path = self.base_file_relative_path()?;
-        let fetched_metadata = 
storage.get_file_metadata(&relative_path).await?;
-        self.base_file.file_metadata = Some(fetched_metadata);
-        Ok(())
-    }
-}
+use std::str::FromStr;
 
 /// Hudi File Group.
 #[derive(Clone, Debug)]
@@ -154,26 +92,67 @@ impl FileGroup {
     }
 
     pub fn add_base_file_from_name(&mut self, file_name: &str) -> 
Result<&Self> {
-        let base_file = BaseFile::try_from(file_name)?;
+        let base_file = BaseFile::from_str(file_name)?;
         self.add_base_file(base_file)
     }
 
     pub fn add_base_file(&mut self, base_file: BaseFile) -> Result<&Self> {
-        let instant_time = base_file.instant_time.as_str();
-        if self.file_slices.contains_key(instant_time) {
+        let commit_timestamp = base_file.commit_timestamp.as_str();
+        if self.file_slices.contains_key(commit_timestamp) {
             Err(CoreError::FileGroup(format!(
-                "Instant time {instant_time} is already present in File Group 
{}",
+                "Instant time {commit_timestamp} is already present in File 
Group {}",
                 self.file_id
             )))
         } else {
             self.file_slices.insert(
-                instant_time.to_owned(),
+                commit_timestamp.to_owned(),
                 FileSlice::new(base_file, self.partition_path.clone()),
             );
             Ok(self)
         }
     }
 
+    pub fn add_base_files<I>(&mut self, base_files: I) -> Result<&Self>
+    where
+        I: IntoIterator<Item = BaseFile>,
+    {
+        for base_file in base_files {
+            self.add_base_file(base_file)?;
+        }
+        Ok(self)
+    }
+
+    pub fn add_log_file_from_name(&mut self, file_name: &str) -> Result<&Self> 
{
+        let log_file = LogFile::from_str(file_name)?;
+        self.add_log_file(log_file)
+    }
+
+    /// Add a [LogFile] to the [FileGroup].
+    ///
+    /// TODO: support adding log files to file group without base files.
+    pub fn add_log_file(&mut self, log_file: LogFile) -> Result<&Self> {
+        let commit_timestamp = log_file.base_commit_timestamp.as_str();
+        if let Some(file_slice) = self.file_slices.get_mut(commit_timestamp) {
+            file_slice.log_files.insert(log_file);
+            Ok(self)
+        } else {
+            Err(CoreError::FileGroup(format!(
+                "Instant time {commit_timestamp} not found in File Group {}",
+                self.file_id
+            )))
+        }
+    }
+
+    pub fn add_log_files<I>(&mut self, log_files: I) -> Result<&Self>
+    where
+        I: IntoIterator<Item = LogFile>,
+    {
+        for log_file in log_files {
+            self.add_log_file(log_file)?;
+        }
+        Ok(self)
+    }
+
     pub fn get_file_slice_as_of(&self, timestamp: &str) -> Option<&FileSlice> {
         let as_of = timestamp.to_string();
         if let Some((_, file_slice)) = 
self.file_slices.range(..=as_of).next_back() {
@@ -214,7 +193,7 @@ mod tests {
             fg.get_file_slice_as_of("20240402123035233")
                 .unwrap()
                 .base_file
-                .instant_time,
+                .commit_timestamp,
             "20240402123035233"
         );
         assert!(fg.get_file_slice_as_of("-1").is_none());
diff --git a/crates/core/src/file_group/reader.rs 
b/crates/core/src/file_group/reader.rs
index 49200f7..387b591 100644
--- a/crates/core/src/file_group/reader.rs
+++ b/crates/core/src/file_group/reader.rs
@@ -21,7 +21,7 @@ use crate::config::util::split_hudi_options_from_others;
 use crate::config::HudiConfigs;
 use crate::error::CoreError::ReadFileSliceError;
 use crate::expr::filter::{Filter, SchemableFilter};
-use crate::file_group::FileSlice;
+use crate::file_group::file_slice::FileSlice;
 use crate::storage::Storage;
 use crate::Result;
 use arrow::compute::and;
@@ -30,25 +30,30 @@ use arrow_schema::Schema;
 use futures::TryFutureExt;
 use std::sync::Arc;
 
+use crate::file_group::log_file::reader::LogFileReader;
+use crate::merge::record_merger::RecordMerger;
 use arrow::compute::filter_record_batch;
 
 /// File group reader handles all read operations against a file group.
 #[derive(Clone, Debug)]
 pub struct FileGroupReader {
     storage: Arc<Storage>,
+    hudi_configs: Arc<HudiConfigs>,
     and_filters: Vec<SchemableFilter>,
 }
 
 impl FileGroupReader {
-    pub fn new(storage: Arc<Storage>) -> Self {
+    pub fn new(storage: Arc<Storage>, hudi_configs: Arc<HudiConfigs>) -> Self {
         Self {
             storage,
+            hudi_configs,
             and_filters: Vec::new(),
         }
     }
 
     pub fn new_with_filters(
         storage: Arc<Storage>,
+        hudi_configs: Arc<HudiConfigs>,
         and_filters: &[Filter],
         schema: &Schema,
     ) -> Result<Self> {
@@ -59,6 +64,7 @@ impl FileGroupReader {
 
         Ok(Self {
             storage,
+            hudi_configs,
             and_filters,
         })
     }
@@ -77,11 +83,8 @@ impl FileGroupReader {
 
         let hudi_configs = Arc::new(HudiConfigs::new(hudi_opts));
 
-        let storage = Storage::new(Arc::new(others), hudi_configs)?;
-        Ok(Self {
-            storage,
-            and_filters: Vec::new(),
-        })
+        let storage = Storage::new(Arc::new(others), hudi_configs.clone())?;
+        Ok(Self::new(storage, hudi_configs))
     }
 
     fn create_boolean_array_mask(&self, records: &RecordBatch) -> 
Result<BooleanArray> {
@@ -117,9 +120,33 @@ impl FileGroupReader {
             .map_err(|e| ReadFileSliceError(format!("Failed to filter records: 
{e:?}")))
     }
 
-    pub async fn read_file_slice(&self, file_slice: &FileSlice) -> 
Result<RecordBatch> {
+    pub async fn read_file_slice(
+        &self,
+        file_slice: &FileSlice,
+        base_file_only: bool,
+    ) -> Result<RecordBatch> {
         let relative_path = file_slice.base_file_relative_path()?;
-        self.read_file_slice_by_base_file_path(&relative_path).await
+        if base_file_only {
+            // TODO caller to support read optimized queries
+            self.read_file_slice_by_base_file_path(&relative_path).await
+        } else {
+            let base_file_records = self
+                .read_file_slice_by_base_file_path(&relative_path)
+                .await?;
+            let schema = base_file_records.schema();
+            let mut all_records = vec![base_file_records];
+
+            for log_file in &file_slice.log_files {
+                let relative_path = 
file_slice.log_file_relative_path(log_file)?;
+                let storage = self.storage.clone();
+                let mut log_file_reader = LogFileReader::new(storage, 
&relative_path).await?;
+                let log_file_records = 
log_file_reader.read_all_records_unmerged()?;
+                all_records.extend_from_slice(&log_file_records);
+            }
+
+            let merger = RecordMerger::new(self.hudi_configs.clone());
+            merger.merge_record_batches(&schema, &all_records)
+        }
     }
 }
 
@@ -138,7 +165,7 @@ mod tests {
     fn test_new() {
         let base_url = Url::parse("file:///tmp/hudi_data").unwrap();
         let storage = Storage::new_with_base_url(base_url).unwrap();
-        let fg_reader = FileGroupReader::new(storage.clone());
+        let fg_reader = FileGroupReader::new(storage.clone(), 
Arc::from(HudiConfigs::empty()));
         assert!(Arc::ptr_eq(&fg_reader.storage, &storage));
     }
 
@@ -155,9 +182,15 @@ mod tests {
         let base_url = Url::parse("file:///tmp/hudi_data").unwrap();
         let storage = Storage::new_with_base_url(base_url)?;
         let schema = create_test_schema();
+        let empty_configs = Arc::new(HudiConfigs::empty());
 
         // Test case 1: Empty filters
-        let reader = FileGroupReader::new_with_filters(storage.clone(), &[], 
&schema)?;
+        let reader = FileGroupReader::new_with_filters(
+            storage.clone(),
+            empty_configs.clone(),
+            &[],
+            &schema,
+        )?;
         assert!(reader.and_filters.is_empty());
 
         // Test case 2: Multiple filters
@@ -165,14 +198,23 @@ mod tests {
             FilterField::new("_hoodie_commit_time").gt("0"),
             FilterField::new("age").gte("18"),
         ];
-        let reader = FileGroupReader::new_with_filters(storage.clone(), 
&filters, &schema)?;
+        let reader = FileGroupReader::new_with_filters(
+            storage.clone(),
+            empty_configs.clone(),
+            &filters,
+            &schema,
+        )?;
         assert_eq!(reader.and_filters.len(), 2);
 
         // Test case 3: Invalid field name should error
         let invalid_filters = 
vec![FilterField::new("non_existent_field").eq("value")];
-        assert!(
-            FileGroupReader::new_with_filters(storage.clone(), 
&invalid_filters, &schema).is_err()
-        );
+        assert!(FileGroupReader::new_with_filters(
+            storage.clone(),
+            empty_configs.clone(),
+            &invalid_filters,
+            &schema
+        )
+        .is_err());
 
         Ok(())
     }
@@ -194,7 +236,8 @@ mod tests {
         let storage =
             
Storage::new_with_base_url(Url::parse("file:///non-existent-path/table").unwrap())
                 .unwrap();
-        let reader = FileGroupReader::new(storage);
+        let empty_configs = Arc::new(HudiConfigs::empty());
+        let reader = FileGroupReader::new(storage, empty_configs.clone());
         let result = reader
             .read_file_slice_by_base_file_path("non_existent_file")
             .await;
@@ -217,17 +260,28 @@ mod tests {
     fn test_create_boolean_array_mask() -> Result<()> {
         let storage =
             
Storage::new_with_base_url(Url::parse("file:///non-existent-path/table").unwrap())?;
+        let empty_configs = Arc::new(HudiConfigs::empty());
         let schema = create_test_schema();
         let records = create_test_record_batch()?;
 
         // Test case 1: No filters
-        let reader = FileGroupReader::new_with_filters(storage.clone(), &[], 
&schema)?;
+        let reader = FileGroupReader::new_with_filters(
+            storage.clone(),
+            empty_configs.clone(),
+            &[],
+            &schema,
+        )?;
         let mask = reader.create_boolean_array_mask(&records)?;
         assert_eq!(mask, BooleanArray::from(vec![true; 5]));
 
         // Test case 2: Single filter on commit time
         let filters = vec![FilterField::new("_hoodie_commit_time").gt("2")];
-        let reader = FileGroupReader::new_with_filters(storage.clone(), 
&filters, &schema)?;
+        let reader = FileGroupReader::new_with_filters(
+            storage.clone(),
+            empty_configs.clone(),
+            &filters,
+            &schema,
+        )?;
         let mask = reader.create_boolean_array_mask(&records)?;
         assert_eq!(
             mask,
@@ -240,7 +294,12 @@ mod tests {
             FilterField::new("_hoodie_commit_time").gt("2"),
             FilterField::new("age").lt("40"),
         ];
-        let reader = FileGroupReader::new_with_filters(storage.clone(), 
&filters, &schema)?;
+        let reader = FileGroupReader::new_with_filters(
+            storage.clone(),
+            empty_configs.clone(),
+            &filters,
+            &schema,
+        )?;
         let mask = reader.create_boolean_array_mask(&records)?;
         assert_eq!(
             mask,
@@ -250,7 +309,12 @@ mod tests {
 
         // Test case 4: Filter resulting in all false
         let filters = vec![FilterField::new("age").gt("100")];
-        let reader = FileGroupReader::new_with_filters(storage.clone(), 
&filters, &schema)?;
+        let reader = FileGroupReader::new_with_filters(
+            storage.clone(),
+            empty_configs.clone(),
+            &filters,
+            &schema,
+        )?;
         let mask = reader.create_boolean_array_mask(&records)?;
         assert_eq!(mask, BooleanArray::from(vec![false; 5]));
 
diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs
index 85f83b9..ce7695b 100644
--- a/crates/core/src/storage/mod.rs
+++ b/crates/core/src/storage/mod.rs
@@ -30,7 +30,7 @@ use futures::StreamExt;
 use object_store::path::Path as ObjPath;
 use object_store::{parse_url_opts, ObjectStore};
 use parquet::arrow::async_reader::ParquetObjectReader;
-use parquet::arrow::ParquetRecordBatchStreamBuilder;
+use parquet::arrow::{parquet_to_arrow_schema, ParquetRecordBatchStreamBuilder};
 use parquet::file::metadata::ParquetMetaData;
 use url::Url;
 
@@ -154,6 +154,17 @@ impl Storage {
         Ok(builder.metadata().as_ref().clone())
     }
 
+    pub async fn get_parquet_file_schema(
+        &self,
+        relative_path: &str,
+    ) -> Result<arrow::datatypes::Schema> {
+        let parquet_meta = 
self.get_parquet_file_metadata(relative_path).await?;
+        Ok(parquet_to_arrow_schema(
+            parquet_meta.file_metadata().schema_descr(),
+            None,
+        )?)
+    }
+
     pub async fn get_file_data(&self, relative_path: &str) -> Result<Bytes> {
         let obj_url = join_url_segments(&self.base_url, &[relative_path])?;
         let obj_path = ObjPath::from_url_path(obj_url.path())?;
diff --git a/crates/core/src/table/builder.rs b/crates/core/src/table/builder.rs
index 0cf2b00..853ef1b 100644
--- a/crates/core/src/table/builder.rs
+++ b/crates/core/src/table/builder.rs
@@ -22,17 +22,15 @@ use std::collections::HashMap;
 use std::env;
 use std::hash::Hash;
 use std::path::PathBuf;
-use std::str::FromStr;
 use std::sync::Arc;
 use strum::IntoEnumIterator;
 
 use crate::config::internal::HudiInternalConfig::SkipConfigValidation;
 use crate::config::read::HudiReadConfig;
+use crate::config::table::HudiTableConfig;
 use crate::config::table::HudiTableConfig::{
-    DropsPartitionFields, TableType, TableVersion, TimelineLayoutVersion,
+    DropsPartitionFields, TableVersion, TimelineLayoutVersion,
 };
-use crate::config::table::TableTypeValue::CopyOnWrite;
-use crate::config::table::{HudiTableConfig, TableTypeValue};
 use crate::config::util::{parse_data_for_options, 
split_hudi_options_from_others};
 use crate::config::{HudiConfigs, HUDI_CONF_DIR};
 use crate::error::CoreError;
@@ -254,13 +252,6 @@ impl TableBuilder {
         }
 
         // additional validation
-        let table_type = hudi_configs.get(TableType)?.to::<String>();
-        if TableTypeValue::from_str(&table_type)? != CopyOnWrite {
-            return Err(CoreError::Unsupported(
-                "Only support copy-on-write table.".to_string(),
-            ));
-        }
-
         let table_version = hudi_configs.get(TableVersion)?.to::<isize>();
         if !(5..=6).contains(&table_version) {
             return Err(CoreError::Unsupported(
diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs
index 455e823..0abbe08 100644
--- a/crates/core/src/table/fs_view.rs
+++ b/crates/core/src/table/fs_view.rs
@@ -22,13 +22,15 @@ use std::sync::Arc;
 
 use crate::config::HudiConfigs;
 use crate::file_group::base_file::BaseFile;
-use crate::file_group::{FileGroup, FileSlice};
+use crate::file_group::FileGroup;
 use crate::storage::{get_leaf_dirs, Storage};
 
 use crate::config::read::HudiReadConfig::ListingParallelism;
+use crate::config::table::HudiTableConfig::BaseFileFormat;
 use crate::error::CoreError;
-use crate::storage::file_metadata::FileMetadata;
-use crate::table::partition::PartitionPruner;
+use crate::file_group::file_slice::FileSlice;
+use crate::file_group::log_file::LogFile;
+use crate::table::partition::{PartitionPruner, PARTITION_METAFIELD_PREFIX};
 use crate::Result;
 use dashmap::DashMap;
 use futures::stream::{self, StreamExt, TryStreamExt};
@@ -57,6 +59,10 @@ impl FileSystemView {
         })
     }
 
+    fn should_exclude_for_listing(file_name: &str) -> bool {
+        file_name.starts_with(PARTITION_METAFIELD_PREFIX) || 
file_name.ends_with(".crc")
+    }
+
     async fn list_all_partition_paths(storage: &Storage) -> 
Result<Vec<String>> {
         Self::list_partition_paths(storage, &PartitionPruner::empty()).await
     }
@@ -91,31 +97,61 @@ impl FileSystemView {
     async fn list_file_groups_for_partition(
         storage: &Storage,
         partition_path: &str,
+        base_file_format: &str,
     ) -> Result<Vec<FileGroup>> {
-        let file_metadata: Vec<FileMetadata> = storage
-            .list_files(Some(partition_path))
-            .await?
-            .into_iter()
-            .filter(|f| f.name.ends_with(".parquet"))
-            .collect();
+        let listed_file_metadata = 
storage.list_files(Some(partition_path)).await?;
+
+        let mut file_id_to_base_files: HashMap<String, Vec<BaseFile>> = 
HashMap::new();
+        let mut file_id_to_log_files: HashMap<String, Vec<LogFile>> = 
HashMap::new();
+
+        for file_metadata in listed_file_metadata {
+            if Self::should_exclude_for_listing(&file_metadata.name) {
+                continue;
+            }
 
-        let mut fg_id_to_base_files: HashMap<String, Vec<BaseFile>> = 
HashMap::new();
-        for metadata in file_metadata {
-            let base_file = BaseFile::try_from(metadata)?;
-            let fg_id = &base_file.file_id;
-            fg_id_to_base_files
-                .entry(fg_id.to_owned())
-                .or_default()
-                .push(base_file);
+            let base_file_extension = format!(".{}", base_file_format);
+            if file_metadata.name.ends_with(&base_file_extension) {
+                // After excluding the unintended files,
+                // we expect a file that has the base file extension to be a 
valid base file.
+                let base_file = BaseFile::try_from(file_metadata)?;
+                let file_id = &base_file.file_id;
+                file_id_to_base_files
+                    .entry(file_id.to_owned())
+                    .or_default()
+                    .push(base_file);
+            } else {
+                match LogFile::try_from(file_metadata) {
+                    Ok(log_file) => {
+                        let file_id = &log_file.file_id;
+                        file_id_to_log_files
+                            .entry(file_id.to_owned())
+                            .or_default()
+                            .push(log_file);
+                    }
+                    Err(e) => {
+                        // We don't support cdc log files yet, hence skipping 
error when parsing
+                        // fails. However, once we support all data files, we 
should return error
+                        // here because we expect all files to be either base 
files or log files,
+                        // after excluding the unintended files.
+                        log::warn!("Failed to create a log file: {}", e);
+                        continue;
+                    }
+                }
+            }
         }
 
         let mut file_groups: Vec<FileGroup> = Vec::new();
-        for (fg_id, base_files) in fg_id_to_base_files.into_iter() {
-            let mut fg = FileGroup::new(fg_id.to_owned(), 
Some(partition_path.to_owned()));
-            for bf in base_files {
-                fg.add_base_file(bf)?;
-            }
-            file_groups.push(fg);
+        // TODO support creating file groups without base files
+        for (file_id, base_files) in file_id_to_base_files.into_iter() {
+            let mut file_group =
+                FileGroup::new(file_id.to_owned(), 
Some(partition_path.to_owned()));
+
+            file_group.add_base_files(base_files)?;
+
+            let log_files = 
file_id_to_log_files.remove(&file_id).unwrap_or_default();
+            file_group.add_log_files(log_files)?;
+
+            file_groups.push(file_group);
         }
         Ok(file_groups)
     }
@@ -129,15 +165,23 @@ impl FileSystemView {
             .filter(|p| partition_pruner.should_include(p))
             .collect::<HashSet<_>>();
 
+        let base_file_format = self
+            .hudi_configs
+            .get_or_default(BaseFileFormat)
+            .to::<String>();
         let parallelism = self
             .hudi_configs
             .get_or_default(ListingParallelism)
             .to::<usize>();
         stream::iter(partition_paths_to_list)
-            .map(|path| async move {
-                let file_groups =
-                    Self::list_file_groups_for_partition(&self.storage, 
&path).await?;
-                Ok::<_, CoreError>((path, file_groups))
+            .map(|path| {
+                let base_file_format = base_file_format.clone();
+                async move {
+                    let format = base_file_format.as_str();
+                    let file_groups =
+                        Self::list_file_groups_for_partition(&self.storage, 
&path, format).await?;
+                    Ok::<_, CoreError>((path, file_groups))
+                }
             })
             .buffer_unordered(parallelism)
             .try_for_each(|(path, file_groups)| async move {
@@ -194,7 +238,7 @@ mod tests {
     use crate::table::partition::PartitionPruner;
     use crate::table::Table;
 
-    use hudi_tests::TestTable;
+    use hudi_tests::SampleTable;
     use std::collections::{HashMap, HashSet};
     use std::sync::Arc;
     use url::Url;
@@ -210,7 +254,7 @@ mod tests {
 
     #[tokio::test]
     async fn get_partition_paths_for_nonpartitioned_table() {
-        let base_url = TestTable::V6Nonpartitioned.url();
+        let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
         let storage = Storage::new_with_base_url(base_url).unwrap();
         let partition_pruner = PartitionPruner::empty();
         let partition_paths = FileSystemView::list_partition_paths(&storage, 
&partition_pruner)
@@ -223,7 +267,7 @@ mod tests {
 
     #[tokio::test]
     async fn get_partition_paths_for_complexkeygen_table() {
-        let base_url = TestTable::V6ComplexkeygenHivestyle.url();
+        let base_url = SampleTable::V6ComplexkeygenHivestyle.url_to_cow();
         let storage = Storage::new_with_base_url(base_url).unwrap();
         let partition_pruner = PartitionPruner::empty();
         let partition_paths = FileSystemView::list_partition_paths(&storage, 
&partition_pruner)
@@ -243,7 +287,7 @@ mod tests {
 
     #[tokio::test]
     async fn fs_view_get_latest_file_slices() {
-        let base_url = TestTable::V6Nonpartitioned.url();
+        let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
         let fs_view = create_test_fs_view(base_url).await;
 
         assert!(fs_view.partition_to_file_groups.is_empty());
@@ -255,11 +299,11 @@ mod tests {
             .unwrap();
         assert_eq!(fs_view.partition_to_file_groups.len(), 1);
         assert_eq!(file_slices.len(), 1);
-        let fg_ids = file_slices
+        let file_ids = file_slices
             .iter()
             .map(|fsl| fsl.file_id())
             .collect::<Vec<_>>();
-        assert_eq!(fg_ids, vec!["a079bdb3-731c-4894-b855-abfcd6921007-0"]);
+        assert_eq!(file_ids, vec!["a079bdb3-731c-4894-b855-abfcd6921007-0"]);
         for fsl in file_slices.iter() {
             
assert_eq!(fsl.base_file.file_metadata.as_ref().unwrap().num_records, 4);
         }
@@ -267,7 +311,7 @@ mod tests {
 
     #[tokio::test]
     async fn fs_view_get_latest_file_slices_with_replace_commit() {
-        let base_url = 
TestTable::V6SimplekeygenNonhivestyleOverwritetable.url();
+        let base_url = 
SampleTable::V6SimplekeygenNonhivestyleOverwritetable.url_to_cow();
         let hudi_table = Table::new(base_url.path()).await.unwrap();
         let fs_view = create_test_fs_view(base_url).await;
 
@@ -284,11 +328,11 @@ mod tests {
             .unwrap();
         assert_eq!(fs_view.partition_to_file_groups.len(), 3);
         assert_eq!(file_slices.len(), 1);
-        let fg_ids = file_slices
+        let file_ids = file_slices
             .iter()
             .map(|fsl| fsl.file_id())
             .collect::<Vec<_>>();
-        assert_eq!(fg_ids, vec!["ebcb261d-62d3-4895-90ec-5b3c9622dff4-0"]);
+        assert_eq!(file_ids, vec!["ebcb261d-62d3-4895-90ec-5b3c9622dff4-0"]);
         for fsl in file_slices.iter() {
             
assert_eq!(fsl.base_file.file_metadata.as_ref().unwrap().num_records, 1);
         }
@@ -296,7 +340,7 @@ mod tests {
 
     #[tokio::test]
     async fn fs_view_get_latest_file_slices_with_partition_filters() {
-        let base_url = TestTable::V6ComplexkeygenHivestyle.url();
+        let base_url = SampleTable::V6ComplexkeygenHivestyle.url_to_cow();
         let hudi_table = Table::new(base_url.path()).await.unwrap();
         let fs_view = create_test_fs_view(base_url).await;
 
@@ -325,11 +369,11 @@ mod tests {
         assert_eq!(fs_view.partition_to_file_groups.len(), 1);
         assert_eq!(file_slices.len(), 1);
 
-        let fg_ids = file_slices
+        let file_ids = file_slices
             .iter()
             .map(|fsl| fsl.file_id())
             .collect::<Vec<_>>();
-        assert_eq!(fg_ids, vec!["a22e8257-e249-45e9-ba46-115bc85adcba-0"]);
+        assert_eq!(file_ids, vec!["a22e8257-e249-45e9-ba46-115bc85adcba-0"]);
         for fsl in file_slices.iter() {
             
assert_eq!(fsl.base_file.file_metadata.as_ref().unwrap().num_records, 2);
         }
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 2c50fc7..0fdd46b 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -90,13 +90,13 @@ mod fs_view;
 pub mod partition;
 
 use crate::config::read::HudiReadConfig::AsOfTimestamp;
-use crate::config::table::HudiTableConfig;
 use crate::config::table::HudiTableConfig::PartitionFields;
+use crate::config::table::{HudiTableConfig, TableTypeValue};
 use crate::config::HudiConfigs;
 use crate::error::CoreError;
 use crate::expr::filter::{Filter, FilterField};
+use crate::file_group::file_slice::FileSlice;
 use crate::file_group::reader::FileGroupReader;
-use crate::file_group::FileSlice;
 use crate::table::builder::TableBuilder;
 use crate::table::fs_view::FileSystemView;
 use crate::table::partition::PartitionPruner;
@@ -106,6 +106,7 @@ use crate::Result;
 use arrow::record_batch::RecordBatch;
 use arrow_schema::{Field, Schema};
 use std::collections::{HashMap, HashSet};
+use std::str::FromStr;
 use std::sync::Arc;
 use url::Url;
 
@@ -165,6 +166,16 @@ impl Table {
             .register_object_store(runtime_env.clone());
     }
 
+    pub fn get_table_type(&self) -> TableTypeValue {
+        let err_msg = format!("{:?} is missing or invalid.", 
HudiTableConfig::TableType);
+        let table_type = self
+            .hudi_configs
+            .get(HudiTableConfig::TableType)
+            .expect(&err_msg)
+            .to::<String>();
+        TableTypeValue::from_str(table_type.as_str()).expect(&err_msg)
+    }
+
     /// Get the latest [Schema] of the table.
     pub async fn get_schema(&self) -> Result<Schema> {
         self.timeline.get_latest_schema().await
@@ -245,7 +256,10 @@ impl Table {
     }
 
     pub fn create_file_group_reader(&self) -> FileGroupReader {
-        FileGroupReader::new(self.file_system_view.storage.clone())
+        FileGroupReader::new(
+            self.file_system_view.storage.clone(),
+            self.hudi_configs.clone(),
+        )
     }
 
     pub async fn create_file_group_reader_with_filters(
@@ -253,7 +267,12 @@ impl Table {
         filters: &[Filter],
     ) -> Result<FileGroupReader> {
         let schema = self.get_schema().await?;
-        
FileGroupReader::new_with_filters(self.file_system_view.storage.clone(), 
filters, &schema)
+        FileGroupReader::new_with_filters(
+            self.file_system_view.storage.clone(),
+            self.hudi_configs.clone(),
+            filters,
+            &schema,
+        )
     }
 
     /// Get all the latest records in the table.
@@ -278,9 +297,13 @@ impl Table {
     ) -> Result<Vec<RecordBatch>> {
         let file_slices = self.get_file_slices_as_of(timestamp, 
filters).await?;
         let fg_reader = self.create_file_group_reader();
-        let batches =
-            futures::future::try_join_all(file_slices.iter().map(|f| 
fg_reader.read_file_slice(f)))
-                .await?;
+        let base_file_only = self.get_table_type() == 
TableTypeValue::CopyOnWrite;
+        let batches = futures::future::try_join_all(
+            file_slices
+                .iter()
+                .map(|f| fg_reader.read_file_slice(f, base_file_only)),
+        )
+        .await?;
         Ok(batches)
     }
 
@@ -315,9 +338,13 @@ impl Table {
             FilterField::new("_hoodie_commit_time").lte(as_of_timestamp),
         ];
         let fg_reader = 
self.create_file_group_reader_with_filters(filters).await?;
-        let batches =
-            futures::future::try_join_all(file_slices.iter().map(|f| 
fg_reader.read_file_slice(f)))
-                .await?;
+        let base_file_only = self.get_table_type() == 
TableTypeValue::CopyOnWrite;
+        let batches = futures::future::try_join_all(
+            file_slices
+                .iter()
+                .map(|f| fg_reader.read_file_slice(f, base_file_only)),
+        )
+        .await?;
         Ok(batches)
     }
 
@@ -346,7 +373,7 @@ mod tests {
     use crate::storage::util::join_url_segments;
     use crate::storage::Storage;
     use crate::table::Filter;
-    use hudi_tests::{assert_not, TestTable};
+    use hudi_tests::{assert_not, SampleTable};
     use std::collections::HashSet;
     use std::fs::canonicalize;
     use std::path::PathBuf;
@@ -384,7 +411,7 @@ mod tests {
 
     #[tokio::test]
     async fn test_hudi_table_get_hudi_options() {
-        let base_url = TestTable::V6Nonpartitioned.url();
+        let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
         let hudi_table = Table::new(base_url.path()).await.unwrap();
         let hudi_options = hudi_table.hudi_options();
         for (k, v) in hudi_options.iter() {
@@ -395,7 +422,7 @@ mod tests {
 
     #[tokio::test]
     async fn test_hudi_table_get_storage_options() {
-        let base_url = TestTable::V6Nonpartitioned.url();
+        let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
         let hudi_table = Table::new(base_url.path()).await.unwrap();
 
         let cloud_prefixes: HashSet<_> = Storage::CLOUD_STORAGE_PREFIXES
@@ -422,7 +449,7 @@ mod tests {
 
     #[tokio::test]
     async fn hudi_table_get_schema() {
-        let base_url = TestTable::V6Nonpartitioned.url();
+        let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
         let hudi_table = Table::new(base_url.path()).await.unwrap();
         let fields: Vec<String> = hudi_table
             .get_schema()
@@ -475,7 +502,7 @@ mod tests {
 
     #[tokio::test]
     async fn hudi_table_get_partition_schema() {
-        let base_url = TestTable::V6TimebasedkeygenNonhivestyle.url();
+        let base_url = SampleTable::V6TimebasedkeygenNonhivestyle.url_to_cow();
         let hudi_table = Table::new(base_url.path()).await.unwrap();
         let fields: Vec<String> = hudi_table
             .get_partition_schema()
@@ -566,7 +593,10 @@ mod tests {
     async fn get_default_for_invalid_table_props() {
         let table = 
get_test_table_without_validation("table_props_invalid").await;
         let configs = table.hudi_configs;
-        assert!(panic::catch_unwind(|| 
configs.get_or_default(BaseFileFormat)).is_err());
+        assert_eq!(
+            configs.get_or_default(BaseFileFormat).to::<String>(),
+            "parquet"
+        );
         assert!(panic::catch_unwind(|| 
configs.get_or_default(Checksum)).is_err());
         assert_eq!(
             configs.get_or_default(DatabaseName).to::<String>(),
@@ -672,7 +702,7 @@ mod tests {
 
     #[tokio::test]
     async fn hudi_table_read_file_slice() {
-        let base_url = TestTable::V6Nonpartitioned.url();
+        let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
         let hudi_table = Table::new(base_url.path()).await.unwrap();
         let batches = hudi_table
             .create_file_group_reader()
@@ -687,7 +717,7 @@ mod tests {
 
     #[tokio::test]
     async fn empty_hudi_table_get_file_slices_splits() {
-        let base_url = TestTable::V6Empty.url();
+        let base_url = SampleTable::V6Empty.url_to_cow();
 
         let hudi_table = Table::new(base_url.path()).await.unwrap();
         let file_slices_splits = hudi_table.get_file_slices_splits(2, 
&[]).await.unwrap();
@@ -696,7 +726,7 @@ mod tests {
 
     #[tokio::test]
     async fn hudi_table_get_file_slices_splits() {
-        let base_url = TestTable::V6SimplekeygenNonhivestyle.url();
+        let base_url = SampleTable::V6SimplekeygenNonhivestyle.url_to_cow();
 
         let hudi_table = Table::new(base_url.path()).await.unwrap();
         let file_slices_splits = hudi_table.get_file_slices_splits(2, 
&[]).await.unwrap();
@@ -707,7 +737,7 @@ mod tests {
 
     #[tokio::test]
     async fn hudi_table_get_file_slices_as_of_timestamps() {
-        let base_url = TestTable::V6Nonpartitioned.url();
+        let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
 
         let hudi_table = Table::new(base_url.path()).await.unwrap();
         let file_slices = hudi_table.get_file_slices(&[]).await.unwrap();
@@ -764,7 +794,7 @@ mod tests {
 
     #[tokio::test]
     async fn hudi_table_get_file_paths_for_simple_keygen_non_hive_style() {
-        let base_url = TestTable::V6SimplekeygenNonhivestyle.url();
+        let base_url = SampleTable::V6SimplekeygenNonhivestyle.url_to_cow();
         let hudi_table = Table::new(base_url.path()).await.unwrap();
         assert_eq!(hudi_table.timeline.completed_commits.len(), 2);
 
@@ -814,7 +844,7 @@ mod tests {
 
     #[tokio::test]
     async fn hudi_table_get_file_paths_for_complex_keygen_hive_style() {
-        let base_url = TestTable::V6ComplexkeygenHivestyle.url();
+        let base_url = SampleTable::V6ComplexkeygenHivestyle.url_to_cow();
         let hudi_table = Table::new(base_url.path()).await.unwrap();
         assert_eq!(hudi_table.timeline.completed_commits.len(), 2);
 
@@ -865,24 +895,77 @@ mod tests {
 
     mod test_snapshot_queries {
         use super::super::*;
-        use arrow_array::{Array, StringArray};
-        use hudi_tests::TestTable;
-        use std::collections::HashSet;
+        use crate::metadata::meta_field::MetaField;
+        use arrow_array::{Array, BooleanArray, Int32Array, StringArray};
+        use hudi_tests::SampleTable;
 
         #[tokio::test]
         async fn test_empty() -> Result<()> {
-            let base_url = TestTable::V6Empty.url();
-            let hudi_table = Table::new(base_url.path()).await?;
+            let base_urls = [
+                SampleTable::V6Empty.url_to_cow(),
+                SampleTable::V6Empty.url_to_mor(),
+            ];
+            for base_url in base_urls.iter() {
+                let hudi_table = Table::new(base_url.path()).await?;
+                let records = hudi_table.read_snapshot(&[]).await?;
+                assert!(records.is_empty());
+            }
+            Ok(())
+        }
 
-            let records = hudi_table.read_snapshot(&[]).await?;
-            assert!(records.is_empty());
+        #[tokio::test]
+        async fn test_non_partitioned() -> Result<()> {
+            let base_urls = [
+                SampleTable::V6Nonpartitioned.url_to_cow(),
+                SampleTable::V6Nonpartitioned.url_to_mor(),
+            ];
+            for base_url in base_urls.iter() {
+                let hudi_table = Table::new(base_url.path()).await?;
+                let records = hudi_table.read_snapshot(&[]).await?;
+                let all_records = 
arrow::compute::concat_batches(&records[0].schema(), &records)?;
+
+                let ids = all_records
+                    .column_by_name("id")
+                    .unwrap()
+                    .as_any()
+                    .downcast_ref::<Int32Array>()
+                    .unwrap();
+                let names = all_records
+                    .column_by_name("name")
+                    .unwrap()
+                    .as_any()
+                    .downcast_ref::<StringArray>()
+                    .unwrap();
+                let is_actives = all_records
+                    .column_by_name("isActive")
+                    .unwrap()
+                    .as_any()
+                    .downcast_ref::<BooleanArray>()
+                    .unwrap();
 
+                let mut data: Vec<(i32, &str, bool)> = ids
+                    .iter()
+                    .zip(names.iter())
+                    .zip(is_actives.iter())
+                    .map(|((id, name), is_active)| (id.unwrap(), 
name.unwrap(), is_active.unwrap()))
+                    .collect();
+                data.sort_unstable_by_key(|(id, _, _)| *id);
+                assert_eq!(
+                    data,
+                    vec![
+                        (1, "Alice", false),
+                        (2, "Bob", false),
+                        (3, "Carol", true),
+                        (4, "Diana", true),
+                    ]
+                )
+            }
             Ok(())
         }
 
         #[tokio::test]
-        async fn test_complex_keygen_hive_style() -> Result<()> {
-            let base_url = TestTable::V6ComplexkeygenHivestyle.url();
+        async fn test_complex_keygen_hive_style_with_filters() -> Result<()> {
+            let base_url = SampleTable::V6ComplexkeygenHivestyle.url_to_mor();
             let hudi_table = Table::new(base_url.path()).await?;
 
             let filters = &[
@@ -891,32 +974,50 @@ mod tests {
                 Filter::try_from(("shortField", "!=", "100"))?,
             ];
             let records = hudi_table.read_snapshot(filters).await?;
-            assert_eq!(records.len(), 1);
-            assert_eq!(records[0].num_rows(), 2);
+            let all_records = 
arrow::compute::concat_batches(&records[0].schema(), &records)?;
 
-            let partition_paths = StringArray::from(
-                records[0]
-                    .column_by_name("_hoodie_partition_path")
-                    .unwrap()
-                    .to_data(),
-            );
-            let actual_partition_paths =
-                HashSet::<&str>::from_iter(partition_paths.iter().map(|s| 
s.unwrap()));
-            let expected_partition_paths = 
HashSet::from_iter(vec!["byteField=10/shortField=300"]);
-            assert_eq!(actual_partition_paths, expected_partition_paths);
+            let partition_paths = all_records
+                .column_by_name(MetaField::PartitionPath.as_ref())
+                .unwrap()
+                .as_any()
+                .downcast_ref::<StringArray>()
+                .unwrap();
+            let ids = all_records
+                .column_by_name("id")
+                .unwrap()
+                .as_any()
+                .downcast_ref::<Int32Array>()
+                .unwrap();
+            let names = all_records
+                .column_by_name("name")
+                .unwrap()
+                .as_any()
+                .downcast_ref::<StringArray>()
+                .unwrap();
+            let is_actives = all_records
+                .column_by_name("isActive")
+                .unwrap()
+                .as_any()
+                .downcast_ref::<BooleanArray>()
+                .unwrap();
 
-            let file_names = StringArray::from(
-                records[0]
-                    .column_by_name("_hoodie_file_name")
-                    .unwrap()
-                    .to_data(),
+            let mut data: Vec<(&str, i32, &str, bool)> = partition_paths
+                .iter()
+                .zip(ids.iter())
+                .zip(names.iter())
+                .zip(is_actives.iter())
+                .map(|(((pt, id), name), is_active)| {
+                    (pt.unwrap(), id.unwrap(), name.unwrap(), 
is_active.unwrap())
+                })
+                .collect();
+            data.sort_unstable_by_key(|(_, id, _, _)| *id);
+            assert_eq!(
+                data,
+                vec![
+                    ("byteField=10/shortField=300", 1, "Alice", false),
+                    ("byteField=10/shortField=300", 3, "Carol", true),
+                ]
             );
-            let actual_file_names =
-                HashSet::<&str>::from_iter(file_names.iter().map(|s| 
s.unwrap()));
-            let expected_file_names: HashSet<&str> = HashSet::from_iter(vec![
-                
"a22e8257-e249-45e9-ba46-115bc85adcba-0_0-161-223_20240418173235694.parquet",
-            ]);
-            assert_eq!(actual_file_names, expected_file_names);
 
             Ok(())
         }
@@ -925,12 +1026,12 @@ mod tests {
     mod test_incremental_queries {
         use super::super::*;
         use arrow_array::{Array, StringArray};
-        use hudi_tests::TestTable;
+        use hudi_tests::SampleTable;
         use std::collections::HashSet;
 
         #[tokio::test]
         async fn test_empty() -> Result<()> {
-            let base_url = TestTable::V6Empty.url();
+            let base_url = SampleTable::V6Empty.url_to_cow();
             let hudi_table = Table::new(base_url.path()).await?;
 
             let records = hudi_table.read_incremental_records("0", 
None).await?;
@@ -941,7 +1042,7 @@ mod tests {
 
         #[tokio::test]
         async fn test_simplekeygen_nonhivestyle_overwritetable() -> Result<()> 
{
-            let base_url = 
TestTable::V6SimplekeygenNonhivestyleOverwritetable.url();
+            let base_url = 
SampleTable::V6SimplekeygenNonhivestyleOverwritetable.url_to_cow();
             let hudi_table = Table::new(base_url.path()).await?;
 
             // read records changed from the first commit (exclusive) to the 
second commit (inclusive)
diff --git a/crates/core/src/table/partition.rs 
b/crates/core/src/table/partition.rs
index 0386ad2..be541c2 100644
--- a/crates/core/src/table/partition.rs
+++ b/crates/core/src/table/partition.rs
@@ -28,6 +28,8 @@ use arrow_schema::Schema;
 use std::collections::HashMap;
 use std::sync::Arc;
 
+pub const PARTITION_METAFIELD_PREFIX: &str = ".hoodie_partition_metadata";
+
 /// A partition pruner that filters partitions based on the partition path and 
its filters.
 #[derive(Debug, Clone)]
 pub struct PartitionPruner {
diff --git a/crates/core/src/timeline/instant.rs 
b/crates/core/src/timeline/instant.rs
index 96bf569..e715b6a 100644
--- a/crates/core/src/timeline/instant.rs
+++ b/crates/core/src/timeline/instant.rs
@@ -28,6 +28,7 @@ use std::str::FromStr;
 #[derive(Clone, Debug, Eq, PartialEq)]
 pub enum Action {
     Commit,
+    DeltaCommit,
     ReplaceCommit,
 }
 
@@ -37,6 +38,7 @@ impl FromStr for Action {
     fn from_str(s: &str) -> Result<Self> {
         match s {
             "commit" => Ok(Action::Commit),
+            "deltacommit" => Ok(Action::DeltaCommit),
             "replacecommit" => Ok(Action::ReplaceCommit),
             _ => Err(CoreError::Timeline(format!("Invalid action: {}", s))),
         }
@@ -47,6 +49,7 @@ impl Action {
     pub fn as_str(&self) -> &str {
         match self {
             Action::Commit => "commit",
+            Action::DeltaCommit => "deltacommit",
             Action::ReplaceCommit => "replacecommit",
         }
     }
@@ -115,11 +118,11 @@ impl Ord for Instant {
     }
 }
 
-impl TryFrom<&str> for Instant {
-    type Error = CoreError;
+impl FromStr for Instant {
+    type Err = CoreError;
 
     /// Parse a timeline file name into an [Instant]. Timezone is assumed to 
be UTC.
-    fn try_from(file_name: &str) -> Result<Self> {
+    fn from_str(file_name: &str) -> Result<Self, Self::Err> {
         Self::try_from_file_name_and_timezone(file_name, "UTC")
     }
 }
@@ -280,13 +283,13 @@ mod tests {
     #[test]
     fn test_instant_from_file_name() -> Result<()> {
         // Test completed commit
-        let instant = Instant::try_from("20240103153000.commit")?;
+        let instant = Instant::from_str("20240103153000.commit")?;
         assert_eq!(instant.timestamp, "20240103153000");
         assert_eq!(instant.action, Action::Commit);
         assert_eq!(instant.state, State::Completed);
 
         // Test inflight replacecommit with milliseconds
-        let instant = 
Instant::try_from("20240103153000123.replacecommit.inflight")?;
+        let instant = 
Instant::from_str("20240103153000123.replacecommit.inflight")?;
         assert_eq!(instant.timestamp, "20240103153000123");
         assert_eq!(instant.action, Action::ReplaceCommit);
         assert_eq!(instant.state, State::Inflight);
@@ -297,16 +300,16 @@ mod tests {
     #[test]
     fn test_invalid_file_names() {
         // Invalid timestamp format
-        assert!(Instant::try_from("2024010315.commit").is_err());
+        assert!(Instant::from_str("2024010315.commit").is_err());
 
         // Invalid action
-        assert!(Instant::try_from("20240103153000.invalid").is_err());
+        assert!(Instant::from_str("20240103153000.invalid").is_err());
 
         // Invalid state
-        assert!(Instant::try_from("20240103153000.commit.invalid").is_err());
+        assert!(Instant::from_str("20240103153000.commit.invalid").is_err());
 
         // No dot separator
-        assert!(Instant::try_from("20240103153000commit").is_err());
+        assert!(Instant::from_str("20240103153000commit").is_err());
     }
 
     #[test]
@@ -319,7 +322,7 @@ mod tests {
         ];
 
         for original_name in test_cases {
-            let instant = Instant::try_from(original_name)?;
+            let instant = Instant::from_str(original_name)?;
             assert_eq!(instant.file_name(), original_name);
         }
 
@@ -328,12 +331,12 @@ mod tests {
 
     #[test]
     fn test_instant_ordering() -> Result<()> {
-        let instant1 = Instant::try_from("20240103153000.commit")?;
-        let instant2 = Instant::try_from("20240103153000001.commit")?;
-        let instant3 = 
Instant::try_from("20240103153000999.commit.requested")?;
-        let instant4 = Instant::try_from("20240103153000999.inflight")?;
-        let instant5 = Instant::try_from("20240103153000999.commit")?;
-        let instant6 = Instant::try_from("20240103153001.commit")?;
+        let instant1 = Instant::from_str("20240103153000.commit")?;
+        let instant2 = Instant::from_str("20240103153000001.commit")?;
+        let instant3 = 
Instant::from_str("20240103153000999.commit.requested")?;
+        let instant4 = Instant::from_str("20240103153000999.inflight")?;
+        let instant5 = Instant::from_str("20240103153000999.commit")?;
+        let instant6 = Instant::from_str("20240103153001.commit")?;
 
         assert!(instant1 < instant2);
         assert!(instant2 < instant3);
@@ -346,7 +349,7 @@ mod tests {
 
     #[test]
     fn test_relative_path() {
-        let instant = Instant::try_from("20240103153000.commit").unwrap();
+        let instant = Instant::from_str("20240103153000.commit").unwrap();
         assert_eq!(
             instant.relative_path().unwrap(),
             ".hoodie/20240103153000.commit"
diff --git a/crates/core/src/timeline/mod.rs b/crates/core/src/timeline/mod.rs
index 463c754..e1fc486 100644
--- a/crates/core/src/timeline/mod.rs
+++ b/crates/core/src/timeline/mod.rs
@@ -29,10 +29,11 @@ use crate::Result;
 use arrow_schema::Schema;
 use instant::Instant;
 use log::debug;
-use parquet::arrow::parquet_to_arrow_schema;
 use serde_json::{Map, Value};
 use std::collections::{HashMap, HashSet};
 use std::fmt::Debug;
+use std::path::PathBuf;
+use std::str::FromStr;
 use std::sync::Arc;
 
 /// A [Timeline] contains transaction logs of all actions performed on the 
table at different [Instant]s of time.
@@ -128,25 +129,55 @@ impl Timeline {
     pub async fn get_latest_schema(&self) -> Result<Schema> {
         let commit_metadata = self.get_latest_commit_metadata().await?;
 
-        let parquet_path = commit_metadata
+        let first_partition = commit_metadata
             .get("partitionToWriteStats")
-            .and_then(|v| v.as_object())
+            .and_then(|v| v.as_object());
+
+        let partition_path = first_partition
+            .and_then(|obj| obj.keys().next())
+            .ok_or_else(|| {
+                CoreError::CommitMetadata(
+                    "Failed to resolve the latest schema: no partition path 
found".to_string(),
+                )
+            })?;
+
+        let first_value = first_partition
             .and_then(|obj| obj.values().next())
             .and_then(|value| value.as_array())
-            .and_then(|arr| arr.first())
-            .and_then(|first_value| first_value["path"].as_str());
-
-        if let Some(path) = parquet_path {
-            let parquet_meta = 
self.storage.get_parquet_file_metadata(path).await?;
-
-            Ok(parquet_to_arrow_schema(
-                parquet_meta.file_metadata().schema_descr(),
-                None,
-            )?)
-        } else {
-            Err(CoreError::Timeline(
+            .and_then(|arr| arr.first());
+
+        let parquet_path = first_value.and_then(|v| v["path"].as_str());
+        match parquet_path {
+            Some(path) if path.ends_with(".parquet") => {
+                Ok(self.storage.get_parquet_file_schema(path).await?)
+            }
+            Some(_) => {
+                // TODO: properly handle deltacommit
+                let base_file = first_value
+                    .and_then(|v| v["baseFile"].as_str())
+                    .ok_or_else(|| {
+                        CoreError::CommitMetadata(
+                            "Failed to resolve the latest schema: no file path 
found".to_string(),
+                        )
+                    })?;
+                let parquet_file_path_buf = PathBuf::from_str(partition_path)
+                    .map_err(|e| {
+                        CoreError::CommitMetadata(format!(
+                            "Failed to resolve the latest schema: {}",
+                            e
+                        ))
+                    })?
+                    .join(base_file);
+                let path = parquet_file_path_buf.to_str().ok_or_else(|| {
+                    CoreError::CommitMetadata(
+                        "Failed to resolve the latest schema: invalid file 
path".to_string(),
+                    )
+                })?;
+                Ok(self.storage.get_parquet_file_schema(path).await?)
+            }
+            None => Err(CoreError::CommitMetadata(
                 "Failed to resolve the latest schema: no file path 
found".to_string(),
-            ))
+            )),
         }
     }
 
@@ -200,11 +231,12 @@ mod tests {
     use std::collections::HashMap;
     use std::fs::canonicalize;
     use std::path::Path;
+    use std::str::FromStr;
     use std::sync::Arc;
 
     use url::Url;
 
-    use hudi_tests::TestTable;
+    use hudi_tests::SampleTable;
 
     use crate::config::table::HudiTableConfig;
 
@@ -219,7 +251,7 @@ mod tests {
 
     #[tokio::test]
     async fn timeline_read_latest_schema() {
-        let base_url = TestTable::V6Nonpartitioned.url();
+        let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
         let timeline = create_test_timeline(base_url).await;
         let table_schema = timeline.get_latest_schema().await.unwrap();
         assert_eq!(table_schema.fields.len(), 21)
@@ -227,14 +259,15 @@ mod tests {
 
     #[tokio::test]
     async fn timeline_read_latest_schema_from_empty_table() {
-        let base_url = TestTable::V6Empty.url();
+        let base_url = SampleTable::V6Empty.url_to_cow();
         let timeline = create_test_timeline(base_url).await;
         let table_schema = timeline.get_latest_schema().await;
         assert!(table_schema.is_err());
-        assert_eq!(
-            table_schema.err().unwrap().to_string(),
-            "Timeline error: Failed to resolve the latest schema: no file path 
found"
-        )
+        assert!(table_schema
+            .err()
+            .unwrap()
+            .to_string()
+            .starts_with("Commit metadata error: Failed to resolve the latest 
schema:"))
     }
 
     #[tokio::test]
@@ -247,8 +280,8 @@ mod tests {
         assert_eq!(
             timeline.completed_commits,
             vec![
-                Instant::try_from("20240402123035233.commit").unwrap(),
-                Instant::try_from("20240402144910683.commit").unwrap(),
+                Instant::from_str("20240402123035233.commit").unwrap(),
+                Instant::from_str("20240402144910683.commit").unwrap(),
             ]
         )
     }
@@ -263,7 +296,7 @@ mod tests {
         )
         .unwrap();
         let timeline = create_test_timeline(base_url).await;
-        let instant = Instant::try_from("20240402123035233.commit").unwrap();
+        let instant = Instant::from_str("20240402123035233.commit").unwrap();
 
         // Test error when reading empty commit metadata file
         let result = timeline.get_commit_metadata(&instant).await;
@@ -272,7 +305,7 @@ mod tests {
         assert!(matches!(err, CoreError::Timeline(_)));
         assert!(err.to_string().contains("Failed to get commit metadata"));
 
-        let instant = Instant::try_from("20240402144910683.commit").unwrap();
+        let instant = Instant::from_str("20240402144910683.commit").unwrap();
 
         // Test error when reading a commit metadata file with invalid JSON
         let result = timeline.get_commit_metadata(&instant).await;
diff --git a/crates/core/src/timeline/selector.rs 
b/crates/core/src/timeline/selector.rs
index a50d48e..e49e374 100644
--- a/crates/core/src/timeline/selector.rs
+++ b/crates/core/src/timeline/selector.rs
@@ -65,7 +65,7 @@ impl TimelineSelector {
             start_datetime,
             end_datetime,
             states: vec![State::Completed],
-            actions: vec![Action::Commit, Action::ReplaceCommit],
+            actions: vec![Action::Commit, Action::DeltaCommit, 
Action::ReplaceCommit],
             include_archived: false,
         })
     }
@@ -188,6 +188,7 @@ mod tests {
     use crate::config::HudiConfigs;
     use hudi_tests::assert_not;
     use std::collections::HashMap;
+    use std::str::FromStr;
     use std::sync::Arc;
 
     fn create_test_selector(
@@ -257,12 +258,12 @@ mod tests {
 
     async fn create_test_timeline() -> Timeline {
         let instants = vec![
-            Instant::try_from("20240103153000.commit").unwrap(),
-            Instant::try_from("20240103153010999.commit").unwrap(),
-            Instant::try_from("20240103153020999.commit.requested").unwrap(),
-            Instant::try_from("20240103153020999.inflight").unwrap(),
-            Instant::try_from("20240103153020999.commit").unwrap(),
-            Instant::try_from("20240103153030999.commit").unwrap(),
+            Instant::from_str("20240103153000.commit").unwrap(),
+            Instant::from_str("20240103153010999.commit").unwrap(),
+            Instant::from_str("20240103153020999.commit.requested").unwrap(),
+            Instant::from_str("20240103153020999.inflight").unwrap(),
+            Instant::from_str("20240103153020999.commit").unwrap(),
+            Instant::from_str("20240103153030999.commit").unwrap(),
         ];
         Timeline::new_from_completed_commits(
             Arc::new(HudiConfigs::new([(
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index 0c27e74..2c14e56 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -346,12 +346,12 @@ mod tests {
 
     use datafusion::logical_expr::BinaryExpr;
     use hudi_core::config::read::HudiReadConfig::InputPartitions;
-    use hudi_tests::TestTable::{
+    use hudi_tests::SampleTable::{
         V6ComplexkeygenHivestyle, V6Empty, V6Nonpartitioned, 
V6SimplekeygenHivestyleNoMetafields,
         V6SimplekeygenNonhivestyle, V6SimplekeygenNonhivestyleOverwritetable,
         V6TimebasedkeygenNonhivestyle,
     };
-    use hudi_tests::{utils, TestTable};
+    use hudi_tests::{utils, SampleTable};
     use utils::{get_bool_column, get_i32_column, get_str_column};
 
     use crate::HudiDataSource;
@@ -369,7 +369,7 @@ mod tests {
     #[tokio::test]
     async fn test_get_empty_schema_from_empty_table() {
         let table_provider =
-            HudiDataSource::new_with_options(V6Empty.path().as_str(), 
empty_options())
+            HudiDataSource::new_with_options(V6Empty.path_to_cow().as_str(), 
empty_options())
                 .await
                 .unwrap();
         let schema = table_provider.schema();
@@ -377,7 +377,7 @@ mod tests {
     }
 
     async fn register_test_table_with_session<I, K, V>(
-        test_table: &TestTable,
+        test_table: &SampleTable,
         options: I,
         use_sql: bool,
     ) -> Result<SessionContext, DataFusionError>
@@ -391,12 +391,12 @@ mod tests {
             let create_table_sql = format!(
                 "CREATE EXTERNAL TABLE {} STORED AS HUDI LOCATION '{}' {}",
                 test_table.as_ref(),
-                test_table.path(),
+                test_table.path_to_cow(),
                 concat_as_sql_options(options)
             );
             ctx.sql(create_table_sql.as_str()).await?;
         } else {
-            let base_url = test_table.url();
+            let base_url = test_table.url_to_cow();
             let hudi = HudiDataSource::new_with_options(base_url.as_str(), 
options).await?;
             ctx.register_table(test_table.as_ref(), Arc::new(hudi))?;
         }
@@ -445,7 +445,7 @@ mod tests {
             "CREATE EXTERNAL TABLE {} STORED AS {} LOCATION '{}'",
             test_table.as_ref(),
             invalid_format,
-            test_table.path()
+            test_table.path_to_cow()
         );
 
         let ctx = create_test_session().await;
@@ -556,10 +556,12 @@ mod tests {
 
     #[tokio::test]
     async fn test_supports_filters_pushdown() {
-        let table_provider =
-            HudiDataSource::new_with_options(V6Nonpartitioned.path().as_str(), 
empty_options())
-                .await
-                .unwrap();
+        let table_provider = HudiDataSource::new_with_options(
+            V6Nonpartitioned.path_to_cow().as_str(),
+            empty_options(),
+        )
+        .await
+        .unwrap();
 
         let expr0 = Expr::BinaryExpr(BinaryExpr {
             left: 
Box::new(Expr::Column(Column::from_name("name".to_string()))),
diff --git a/crates/tests/data/tables/v6_complexkeygen_hivestyle.sql 
b/crates/tests/data/tables/cow/v6_complexkeygen_hivestyle.sql
similarity index 100%
copy from crates/tests/data/tables/v6_complexkeygen_hivestyle.sql
copy to crates/tests/data/tables/cow/v6_complexkeygen_hivestyle.sql
diff --git a/crates/tests/data/tables/v6_complexkeygen_hivestyle.zip 
b/crates/tests/data/tables/cow/v6_complexkeygen_hivestyle.zip
similarity index 100%
rename from crates/tests/data/tables/v6_complexkeygen_hivestyle.zip
rename to crates/tests/data/tables/cow/v6_complexkeygen_hivestyle.zip
diff --git a/crates/tests/data/tables/v6_empty.sql 
b/crates/tests/data/tables/cow/v6_empty.sql
similarity index 100%
copy from crates/tests/data/tables/v6_empty.sql
copy to crates/tests/data/tables/cow/v6_empty.sql
diff --git a/crates/tests/data/tables/v6_empty.zip 
b/crates/tests/data/tables/cow/v6_empty.zip
similarity index 100%
rename from crates/tests/data/tables/v6_empty.zip
rename to crates/tests/data/tables/cow/v6_empty.zip
diff --git a/crates/tests/data/tables/v6_nonpartitioned.sql 
b/crates/tests/data/tables/cow/v6_nonpartitioned.sql
similarity index 100%
copy from crates/tests/data/tables/v6_nonpartitioned.sql
copy to crates/tests/data/tables/cow/v6_nonpartitioned.sql
diff --git a/crates/tests/data/tables/v6_nonpartitioned.zip 
b/crates/tests/data/tables/cow/v6_nonpartitioned.zip
similarity index 100%
rename from crates/tests/data/tables/v6_nonpartitioned.zip
rename to crates/tests/data/tables/cow/v6_nonpartitioned.zip
diff --git 
a/crates/tests/data/tables/v6_simplekeygen_hivestyle_no_metafields.sql 
b/crates/tests/data/tables/cow/v6_simplekeygen_hivestyle_no_metafields.sql
similarity index 100%
rename from crates/tests/data/tables/v6_simplekeygen_hivestyle_no_metafields.sql
rename to 
crates/tests/data/tables/cow/v6_simplekeygen_hivestyle_no_metafields.sql
diff --git 
a/crates/tests/data/tables/v6_simplekeygen_hivestyle_no_metafields.zip 
b/crates/tests/data/tables/cow/v6_simplekeygen_hivestyle_no_metafields.zip
similarity index 100%
rename from crates/tests/data/tables/v6_simplekeygen_hivestyle_no_metafields.zip
rename to 
crates/tests/data/tables/cow/v6_simplekeygen_hivestyle_no_metafields.zip
diff --git a/crates/tests/data/tables/v6_simplekeygen_nonhivestyle.sql 
b/crates/tests/data/tables/cow/v6_simplekeygen_nonhivestyle.sql
similarity index 100%
rename from crates/tests/data/tables/v6_simplekeygen_nonhivestyle.sql
rename to crates/tests/data/tables/cow/v6_simplekeygen_nonhivestyle.sql
diff --git a/crates/tests/data/tables/v6_simplekeygen_nonhivestyle.zip 
b/crates/tests/data/tables/cow/v6_simplekeygen_nonhivestyle.zip
similarity index 100%
rename from crates/tests/data/tables/v6_simplekeygen_nonhivestyle.zip
rename to crates/tests/data/tables/cow/v6_simplekeygen_nonhivestyle.zip
diff --git 
a/crates/tests/data/tables/v6_simplekeygen_nonhivestyle_overwritetable.sql 
b/crates/tests/data/tables/cow/v6_simplekeygen_nonhivestyle_overwritetable.sql
similarity index 100%
rename from 
crates/tests/data/tables/v6_simplekeygen_nonhivestyle_overwritetable.sql
rename to 
crates/tests/data/tables/cow/v6_simplekeygen_nonhivestyle_overwritetable.sql
diff --git 
a/crates/tests/data/tables/v6_simplekeygen_nonhivestyle_overwritetable.zip 
b/crates/tests/data/tables/cow/v6_simplekeygen_nonhivestyle_overwritetable.zip
similarity index 100%
rename from 
crates/tests/data/tables/v6_simplekeygen_nonhivestyle_overwritetable.zip
rename to 
crates/tests/data/tables/cow/v6_simplekeygen_nonhivestyle_overwritetable.zip
diff --git a/crates/tests/data/tables/v6_timebasedkeygen_nonhivestyle.sql 
b/crates/tests/data/tables/cow/v6_timebasedkeygen_nonhivestyle.sql
similarity index 100%
rename from crates/tests/data/tables/v6_timebasedkeygen_nonhivestyle.sql
rename to crates/tests/data/tables/cow/v6_timebasedkeygen_nonhivestyle.sql
diff --git a/crates/tests/data/tables/v6_timebasedkeygen_nonhivestyle.zip 
b/crates/tests/data/tables/cow/v6_timebasedkeygen_nonhivestyle.zip
similarity index 100%
rename from crates/tests/data/tables/v6_timebasedkeygen_nonhivestyle.zip
rename to crates/tests/data/tables/cow/v6_timebasedkeygen_nonhivestyle.zip
diff --git a/crates/tests/data/tables/v6_complexkeygen_hivestyle.sql 
b/crates/tests/data/tables/mor/v6_complexkeygen_hivestyle.sql
similarity index 93%
rename from crates/tests/data/tables/v6_complexkeygen_hivestyle.sql
rename to crates/tests/data/tables/mor/v6_complexkeygen_hivestyle.sql
index 77a1fa7..64f2efd 100644
--- a/crates/tests/data/tables/v6_complexkeygen_hivestyle.sql
+++ b/crates/tests/data/tables/mor/v6_complexkeygen_hivestyle.sql
@@ -43,12 +43,17 @@ CREATE TABLE v6_complexkeygen_hivestyle (
                                             shortField SHORT
 )
     USING HUDI
+    LOCATION '/opt/data/external_tables/v6_complexkeygen_hivestyle'
 TBLPROPERTIES (
-    type = 'cow',
+    type = 'mor',
     primaryKey = 'id,name',
     preCombineField = 'longField',
     'hoodie.metadata.enable' = 'false',
-    'hoodie.datasource.write.hive_style_partitioning' = 'true'
+    'hoodie.datasource.write.hive_style_partitioning' = 'true',
+    'hoodie.table.log.file.format' = 'PARQUET',
+    'hoodie.logfile.data.block.format' = 'parquet',
+    'hoodie.datasource.write.record.merger.impls' = 
'org.apache.hudi.HoodieSparkRecordMerger',
+    'hoodie.parquet.small.file.limit' = '0'
 )
 PARTITIONED BY (byteField, shortField);
 
diff --git a/crates/tests/data/tables/mor/v6_complexkeygen_hivestyle.zip 
b/crates/tests/data/tables/mor/v6_complexkeygen_hivestyle.zip
new file mode 100644
index 0000000..822728f
Binary files /dev/null and 
b/crates/tests/data/tables/mor/v6_complexkeygen_hivestyle.zip differ
diff --git a/crates/tests/data/tables/v6_empty.sql 
b/crates/tests/data/tables/mor/v6_empty.sql
similarity index 97%
rename from crates/tests/data/tables/v6_empty.sql
rename to crates/tests/data/tables/mor/v6_empty.sql
index 6db4624..7c6a7db 100644
--- a/crates/tests/data/tables/v6_empty.sql
+++ b/crates/tests/data/tables/mor/v6_empty.sql
@@ -24,7 +24,7 @@ create table v6_empty (
 )
     USING HUDI
     TBLPROPERTIES (
-        type = 'cow',
+        type = 'mor',
         primaryKey = 'id',
         'hoodie.metadata.enable' = 'false'
 );
diff --git a/crates/tests/data/tables/mor/v6_empty.zip 
b/crates/tests/data/tables/mor/v6_empty.zip
new file mode 100644
index 0000000..6a8e90a
Binary files /dev/null and b/crates/tests/data/tables/mor/v6_empty.zip differ
diff --git a/crates/tests/data/tables/v6_nonpartitioned.sql 
b/crates/tests/data/tables/mor/v6_nonpartitioned.sql
similarity index 94%
rename from crates/tests/data/tables/v6_nonpartitioned.sql
rename to crates/tests/data/tables/mor/v6_nonpartitioned.sql
index d581dfa..32ff4c8 100644
--- a/crates/tests/data/tables/v6_nonpartitioned.sql
+++ b/crates/tests/data/tables/mor/v6_nonpartitioned.sql
@@ -44,10 +44,14 @@ CREATE TABLE v6_nonpartitioned (
 )
     USING HUDI
 TBLPROPERTIES (
-    type = 'cow',
+    type = 'mor',
     primaryKey = 'id',
     preCombineField = 'longField',
-    'hoodie.metadata.enable' = 'false'
+    'hoodie.metadata.enable' = 'false',
+    'hoodie.table.log.file.format' = 'PARQUET',
+    'hoodie.logfile.data.block.format' = 'parquet',
+    'hoodie.datasource.write.record.merger.impls' = 
'org.apache.hudi.HoodieSparkRecordMerger',
+    'hoodie.parquet.small.file.limit' = '0'
 );
 
 INSERT INTO v6_nonpartitioned VALUES
diff --git a/crates/tests/data/tables/mor/v6_nonpartitioned.zip 
b/crates/tests/data/tables/mor/v6_nonpartitioned.zip
new file mode 100644
index 0000000..c4eb63e
Binary files /dev/null and b/crates/tests/data/tables/mor/v6_nonpartitioned.zip 
differ
diff --git a/crates/tests/src/lib.rs b/crates/tests/src/lib.rs
index 457ae4e..a7b1e7b 100644
--- a/crates/tests/src/lib.rs
+++ b/crates/tests/src/lib.rs
@@ -36,7 +36,7 @@ pub fn extract_test_table(zip_path: &Path) -> PathBuf {
 
 #[derive(Debug, EnumString, AsRefStr, EnumIter)]
 #[strum(serialize_all = "snake_case")]
-pub enum TestTable {
+pub enum SampleTable {
     V6ComplexkeygenHivestyle,
     V6Empty,
     V6Nonpartitioned,
@@ -46,23 +46,35 @@ pub enum TestTable {
     V6TimebasedkeygenNonhivestyle,
 }
 
-impl TestTable {
-    pub fn zip_path(&self) -> Box<Path> {
+impl SampleTable {
+    fn zip_path(&self, table_type: &str) -> Box<Path> {
         let dir = env!("CARGO_MANIFEST_DIR");
         let data_path = Path::new(dir)
             .join("data/tables")
+            .join(table_type.to_lowercase())
             .join(format!("{}.zip", self.as_ref()));
         data_path.into_boxed_path()
     }
 
-    pub fn path(&self) -> String {
-        let zip_path = self.zip_path();
+    pub fn path_to_cow(&self) -> String {
+        let zip_path = self.zip_path("cow");
         let path_buf = 
extract_test_table(zip_path.as_ref()).join(self.as_ref());
         path_buf.to_str().unwrap().to_string()
     }
 
-    pub fn url(&self) -> Url {
-        let path = self.path();
+    pub fn path_to_mor(&self) -> String {
+        let zip_path = self.zip_path("mor");
+        let path_buf = 
extract_test_table(zip_path.as_ref()).join(self.as_ref());
+        path_buf.to_str().unwrap().to_string()
+    }
+
+    pub fn url_to_cow(&self) -> Url {
+        let path = self.path_to_cow();
+        Url::from_file_path(path).unwrap()
+    }
+
+    pub fn url_to_mor(&self) -> Url {
+        let path = self.path_to_mor();
         Url::from_file_path(path).unwrap()
     }
 }
@@ -71,12 +83,12 @@ impl TestTable {
 mod tests {
     use strum::IntoEnumIterator;
 
-    use crate::TestTable;
+    use crate::SampleTable;
 
     #[test]
-    fn test_table_zip_file_should_exist() {
-        for t in TestTable::iter() {
-            let path = t.zip_path();
+    fn sample_table_zip_file_should_exist() {
+        for t in SampleTable::iter() {
+            let path = t.zip_path("cow");
             assert!(path.exists());
             assert!(path.is_file());
         }
diff --git a/demo/app/python/src/main.py b/demo/app/python/src/main.py
index 8c08bf6..cc87068 100644
--- a/demo/app/python/src/main.py
+++ b/demo/app/python/src/main.py
@@ -18,35 +18,37 @@
 from hudi import HudiTableBuilder
 import pyarrow as pa
 
-hudi_table = HudiTableBuilder.from_base_uri(
-    "s3://hudi-demo/v6_complexkeygen_hivestyle"
-).build()
-records = hudi_table.read_snapshot()
+for url in [
+    "s3://hudi-demo/cow/v6_complexkeygen_hivestyle",
+    "s3://hudi-demo/mor/v6_complexkeygen_hivestyle",
+]:
+    hudi_table = HudiTableBuilder.from_base_uri(url).build()
+    records = hudi_table.read_snapshot()
 
-arrow_table = pa.Table.from_batches(records)
-assert arrow_table.schema.names == [
-    "_hoodie_commit_time",
-    "_hoodie_commit_seqno",
-    "_hoodie_record_key",
-    "_hoodie_partition_path",
-    "_hoodie_file_name",
-    "id",
-    "name",
-    "isActive",
-    "intField",
-    "longField",
-    "floatField",
-    "doubleField",
-    "decimalField",
-    "dateField",
-    "timestampField",
-    "binaryField",
-    "arrayField",
-    "mapField",
-    "structField",
-    "byteField",
-    "shortField",
-]
-assert arrow_table.num_rows == 4
+    arrow_table = pa.Table.from_batches(records)
+    assert arrow_table.schema.names == [
+        "_hoodie_commit_time",
+        "_hoodie_commit_seqno",
+        "_hoodie_record_key",
+        "_hoodie_partition_path",
+        "_hoodie_file_name",
+        "id",
+        "name",
+        "isActive",
+        "intField",
+        "longField",
+        "floatField",
+        "doubleField",
+        "decimalField",
+        "dateField",
+        "timestampField",
+        "binaryField",
+        "arrayField",
+        "mapField",
+        "structField",
+        "byteField",
+        "shortField",
+    ]
+    assert arrow_table.num_rows == 4
 
 print("Python API: read snapshot successfully!")
diff --git a/demo/app/rust/src/main.rs b/demo/app/rust/src/main.rs
index 9b5ea69..1bfbfd3 100644
--- a/demo/app/rust/src/main.rs
+++ b/demo/app/rust/src/main.rs
@@ -26,9 +26,9 @@ use hudi::HudiDataSource;
 #[tokio::main]
 async fn main() -> Result<()> {
     let ctx = SessionContext::new();
-    let hudi = 
HudiDataSource::new("s3://hudi-demo/v6_complexkeygen_hivestyle").await?;
-    ctx.register_table("v6_table", Arc::new(hudi))?;
-    let df: DataFrame = ctx.sql("SELECT * from v6_table").await?;
+    let hudi = 
HudiDataSource::new("s3://hudi-demo/cow/v6_complexkeygen_hivestyle").await?;
+    ctx.register_table("cow_v6_table", Arc::new(hudi))?;
+    let df: DataFrame = ctx.sql("SELECT * from cow_v6_table").await?;
     assert!(
         df.schema()
             .columns()
diff --git a/demo/infra/mc/prepare_data.sh b/demo/infra/mc/prepare_data.sh
index 2158b98..c432590 100755
--- a/demo/infra/mc/prepare_data.sh
+++ b/demo/infra/mc/prepare_data.sh
@@ -24,8 +24,11 @@ mc alias set local http://minio:9000 "$MINIO_ROOT_USER" 
"$MINIO_ROOT_PASSWORD"
 mc mb local/hudi-demo
 
 # unzip the data
-mkdir /tmp/tables
-for zip in /opt/data/tables/*.zip; do unzip -o "$zip" -d "/tmp/tables/"; done
+mkdir -p /tmp/tables/cow/
+for zip in /opt/data/tables/cow/*.zip; do unzip -o "$zip" -d 
"/tmp/tables/cow/"; done
+mkdir -p /tmp/tables/mor/
+for zip in /opt/data/tables/mor/*.zip; do unzip -o "$zip" -d 
"/tmp/tables/mor/"; done
 
 # copy the data to the bucket
-mc cp -r /tmp/tables/* local/hudi-demo/
+mc cp -r /tmp/tables/cow/* local/hudi-demo/cow/
+mc cp -r /tmp/tables/mor/* local/hudi-demo/mor/
diff --git a/python/src/internal.rs b/python/src/internal.rs
index e11b21a..b2a667b 100644
--- a/python/src/internal.rs
+++ b/python/src/internal.rs
@@ -27,8 +27,8 @@ use tokio::runtime::Runtime;
 
 use hudi::error::CoreError;
 use hudi::expr::filter::Filter;
+use hudi::file_group::file_slice::FileSlice;
 use hudi::file_group::reader::FileGroupReader;
-use hudi::file_group::FileSlice;
 use hudi::storage::error::StorageError;
 use hudi::table::builder::TableBuilder;
 use hudi::table::Table;
@@ -128,7 +128,7 @@ fn convert_file_slice(f: &FileSlice) -> HudiFileSlice {
     let file_id = f.file_id().to_string();
     let partition_path = f.partition_path().to_string();
     let creation_instant_time = f.creation_instant_time().to_string();
-    let base_file_name = f.base_file.file_name.clone();
+    let base_file_name = f.base_file.file_name();
     let file_metadata = f.base_file.file_metadata.clone().unwrap_or_default();
     let base_file_size = file_metadata.size;
     let base_file_byte_size = file_metadata.byte_size;

Reply via email to