This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 3dc5fe2 feat: add APIs for MOR snapshot reads (#247)
3dc5fe2 is described below
commit 3dc5fe239c8038d61567e8085828f98eba1c0584
Author: Shiyan Xu <[email protected]>
AuthorDate: Fri Jan 17 23:59:27 2025 -0600
feat: add APIs for MOR snapshot reads (#247)
- Make `FileGroup` and `FileSlice` support adding and reading log files
- Perform snapshot read using `FileGroupReader`, which merges log files
with base files when required
- Make `BaseFile` and `LogFile` APIs more ergonomic
---
crates/core/src/config/table.rs | 3 +
crates/core/src/file_group/base_file.rs | 88 ++++++---
crates/core/src/file_group/file_slice.rs | 97 ++++++++++
crates/core/src/file_group/log_file/log_block.rs | 1 +
crates/core/src/file_group/log_file/mod.rs | 169 +++++++++++++---
crates/core/src/file_group/log_file/reader.rs | 14 +-
crates/core/src/file_group/mod.rs | 123 +++++-------
crates/core/src/file_group/reader.rs | 104 ++++++++--
crates/core/src/storage/mod.rs | 13 +-
crates/core/src/table/builder.rs | 13 +-
crates/core/src/table/fs_view.rs | 122 ++++++++----
crates/core/src/table/mod.rs | 215 +++++++++++++++------
crates/core/src/table/partition.rs | 2 +
crates/core/src/timeline/instant.rs | 37 ++--
crates/core/src/timeline/mod.rs | 87 ++++++---
crates/core/src/timeline/selector.rs | 15 +-
crates/datafusion/src/lib.rs | 24 +--
.../{ => cow}/v6_complexkeygen_hivestyle.sql | 0
.../{ => cow}/v6_complexkeygen_hivestyle.zip | Bin
crates/tests/data/tables/{ => cow}/v6_empty.sql | 0
crates/tests/data/tables/{ => cow}/v6_empty.zip | Bin
.../data/tables/{ => cow}/v6_nonpartitioned.sql | 0
.../data/tables/{ => cow}/v6_nonpartitioned.zip | Bin
.../v6_simplekeygen_hivestyle_no_metafields.sql | 0
.../v6_simplekeygen_hivestyle_no_metafields.zip | Bin
.../{ => cow}/v6_simplekeygen_nonhivestyle.sql | 0
.../{ => cow}/v6_simplekeygen_nonhivestyle.zip | Bin
...v6_simplekeygen_nonhivestyle_overwritetable.sql | 0
...v6_simplekeygen_nonhivestyle_overwritetable.zip | Bin
.../{ => cow}/v6_timebasedkeygen_nonhivestyle.sql | 0
.../{ => cow}/v6_timebasedkeygen_nonhivestyle.zip | Bin
.../{ => mor}/v6_complexkeygen_hivestyle.sql | 9 +-
.../data/tables/mor/v6_complexkeygen_hivestyle.zip | Bin 0 -> 42137 bytes
crates/tests/data/tables/{ => mor}/v6_empty.sql | 2 +-
crates/tests/data/tables/mor/v6_empty.zip | Bin 0 -> 5885 bytes
.../data/tables/{ => mor}/v6_nonpartitioned.sql | 8 +-
crates/tests/data/tables/mor/v6_nonpartitioned.zip | Bin 0 -> 28459 bytes
crates/tests/src/lib.rs | 34 ++--
demo/app/python/src/main.py | 60 +++---
demo/app/rust/src/main.rs | 6 +-
demo/infra/mc/prepare_data.sh | 9 +-
python/src/internal.rs | 4 +-
42 files changed, 887 insertions(+), 372 deletions(-)
diff --git a/crates/core/src/config/table.rs b/crates/core/src/config/table.rs
index 7101971..9915626 100644
--- a/crates/core/src/config/table.rs
+++ b/crates/core/src/config/table.rs
@@ -146,6 +146,9 @@ impl ConfigParser for HudiTableConfig {
fn default_value(&self) -> Option<Self::Output> {
match self {
+ Self::BaseFileFormat => Some(HudiConfigValue::String(
+ BaseFileFormatValue::Parquet.as_ref().to_string(),
+ )),
Self::DatabaseName =>
Some(HudiConfigValue::String("default".to_string())),
Self::DropsPartitionFields =>
Some(HudiConfigValue::Boolean(false)),
Self::PartitionFields => Some(HudiConfigValue::List(vec![])),
diff --git a/crates/core/src/file_group/base_file.rs
b/crates/core/src/file_group/base_file.rs
index 7c4fcfb..8d51969 100644
--- a/crates/core/src/file_group/base_file.rs
+++ b/crates/core/src/file_group/base_file.rs
@@ -19,52 +19,85 @@
use crate::error::CoreError;
use crate::storage::file_metadata::FileMetadata;
use crate::Result;
+use std::str::FromStr;
/// Hudi Base file, part of a [FileSlice].
#[derive(Clone, Debug)]
pub struct BaseFile {
- /// The file name of the base file.
- pub file_name: String,
-
- /// The id of the enclosing file group.
+ /// The id of the enclosing [FileGroup].
pub file_id: String,
- /// The associated instant time of the base file.
- pub instant_time: String,
+ /// Monotonically increasing token for every attempt to write the
[BaseFile].
+ pub write_token: String,
+
+ /// The timestamp of the commit instant in the Timeline that created the
[BaseFile].
+ pub commit_timestamp: String,
+
+ /// File extension that matches to
[crate::config::table::HudiTableConfig::BaseFileFormat].
+ ///
+ /// See also [crate::config::table::BaseFileFormatValue].
+ pub extension: String,
/// The metadata about the file.
pub file_metadata: Option<FileMetadata>,
}
impl BaseFile {
- /// Parse file name and extract `file_id` and `instant_time`.
- fn parse_file_name(file_name: &str) -> Result<(String, String)> {
+ /// Parse a base file's name into parts.
+ ///
+ /// File name format:
+ ///
+ /// ```text
+ /// [File Id]_[File Write Token]_[Commit timestamp].[File Extension]
+ /// ```
+ fn parse_file_name(file_name: &str) -> Result<(String, String, String,
String)> {
let err_msg = format!("Failed to parse file name '{file_name}' for
base file.");
- let (name, _) = file_name
+ let (stem, extension) = file_name
.rsplit_once('.')
.ok_or_else(|| CoreError::FileGroup(err_msg.clone()))?;
- let parts: Vec<&str> = name.split('_').collect();
+ let parts: Vec<&str> = stem.split('_').collect();
let file_id = parts
.first()
.ok_or_else(|| CoreError::FileGroup(err_msg.clone()))?
.to_string();
- let instant_time = parts
+ let write_token = parts
+ .get(1)
+ .ok_or_else(|| CoreError::FileGroup(err_msg.clone()))?
+ .to_string();
+ let commit_timestamp = parts
.get(2)
.ok_or_else(|| CoreError::FileGroup(err_msg.clone()))?
.to_string();
- Ok((file_id, instant_time))
+ Ok((
+ file_id,
+ write_token,
+ commit_timestamp,
+ extension.to_string(),
+ ))
+ }
+
+ #[inline]
+ pub fn file_name(&self) -> String {
+ format!(
+ "{file_id}_{write_token}_{commit_timestamp}.{extension}",
+ file_id = self.file_id,
+ write_token = self.write_token,
+ commit_timestamp = self.commit_timestamp,
+ extension = self.extension,
+ )
}
}
-impl TryFrom<&str> for BaseFile {
- type Error = CoreError;
+impl FromStr for BaseFile {
+ type Err = CoreError;
- fn try_from(file_name: &str) -> Result<Self> {
- let (file_id, instant_time) = Self::parse_file_name(file_name)?;
+ fn from_str(file_name: &str) -> Result<Self, Self::Err> {
+ let (file_id, write_token, commit_timestamp, extension) =
Self::parse_file_name(file_name)?;
Ok(Self {
- file_name: file_name.to_string(),
file_id,
- instant_time,
+ write_token,
+ commit_timestamp,
+ extension,
file_metadata: None,
})
}
@@ -74,12 +107,13 @@ impl TryFrom<FileMetadata> for BaseFile {
type Error = CoreError;
fn try_from(metadata: FileMetadata) -> Result<Self> {
- let file_name = metadata.name.clone();
- let (file_id, instant_time) = Self::parse_file_name(&file_name)?;
+ let file_name = metadata.name.as_str();
+ let (file_id, write_token, commit_timestamp, extension) =
Self::parse_file_name(file_name)?;
Ok(Self {
- file_name,
file_id,
- instant_time,
+ write_token,
+ commit_timestamp,
+ extension,
file_metadata: Some(metadata),
})
}
@@ -93,9 +127,9 @@ mod tests {
#[test]
fn test_create_base_file_from_file_name() {
let file_name =
"5a226868-2934-4f84-a16f-55124630c68d-0_0-7-24_20240402144910683.parquet";
- let base_file = BaseFile::try_from(file_name).unwrap();
+ let base_file = BaseFile::from_str(file_name).unwrap();
assert_eq!(base_file.file_id,
"5a226868-2934-4f84-a16f-55124630c68d-0");
- assert_eq!(base_file.instant_time, "20240402144910683");
+ assert_eq!(base_file.commit_timestamp, "20240402144910683");
assert!(base_file.file_metadata.is_none());
}
@@ -107,7 +141,7 @@ mod tests {
);
let base_file = BaseFile::try_from(metadata).unwrap();
assert_eq!(base_file.file_id,
"5a226868-2934-4f84-a16f-55124630c68d-0");
- assert_eq!(base_file.instant_time, "20240402144910683");
+ assert_eq!(base_file.commit_timestamp, "20240402144910683");
let file_metadata = base_file.file_metadata.unwrap();
assert_eq!(file_metadata.size, 1024);
assert_not!(file_metadata.fully_populated);
@@ -115,10 +149,10 @@ mod tests {
#[test]
fn create_a_base_file_returns_error() {
- let result = BaseFile::try_from("no_file_extension");
+ let result = BaseFile::from_str("no_file_extension");
assert!(matches!(result.unwrap_err(), CoreError::FileGroup(_)));
- let result = BaseFile::try_from(".parquet");
+ let result = BaseFile::from_str(".parquet");
assert!(matches!(result.unwrap_err(), CoreError::FileGroup(_)));
let metadata = FileMetadata::new("no-valid-delimiter.parquet", 1024);
diff --git a/crates/core/src/file_group/file_slice.rs
b/crates/core/src/file_group/file_slice.rs
new file mode 100644
index 0000000..c8ca359
--- /dev/null
+++ b/crates/core/src/file_group/file_slice.rs
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use crate::error::CoreError;
+use crate::file_group::base_file::BaseFile;
+use crate::file_group::log_file::LogFile;
+use crate::storage::Storage;
+use crate::Result;
+use std::collections::BTreeSet;
+use std::path::PathBuf;
+
+/// Within a [crate::file_group::FileGroup],
+/// a [FileSlice] is a logical group of [BaseFile] and [LogFile]s.
+#[derive(Clone, Debug)]
+pub struct FileSlice {
+ pub base_file: BaseFile,
+ pub log_files: BTreeSet<LogFile>,
+ pub partition_path: Option<String>,
+}
+
+impl FileSlice {
+ pub fn new(base_file: BaseFile, partition_path: Option<String>) -> Self {
+ Self {
+ base_file,
+ log_files: BTreeSet::new(),
+ partition_path,
+ }
+ }
+
+ fn relative_path_for_file(&self, file_name: &str) -> Result<String> {
+ let path = PathBuf::from(self.partition_path()).join(file_name);
+ path.to_str().map(|s| s.to_string()).ok_or_else(|| {
+ CoreError::FileGroup(format!("Failed to get relative path for
file: {file_name}",))
+ })
+ }
+
+ /// Returns the relative path of the [BaseFile] in the [FileSlice].
+ pub fn base_file_relative_path(&self) -> Result<String> {
+ let file_name = &self.base_file.file_name();
+ self.relative_path_for_file(file_name)
+ }
+
+ /// Returns the relative path of the given [LogFile] in the [FileSlice].
+ pub fn log_file_relative_path(&self, log_file: &LogFile) -> Result<String>
{
+ let file_name = &log_file.file_name();
+ self.relative_path_for_file(file_name)
+ }
+
+ /// Returns the enclosing [FileGroup]'s id.
+ #[inline]
+ pub fn file_id(&self) -> &str {
+ &self.base_file.file_id
+ }
+
+ /// Returns the partition path of the [FileSlice].
+ #[inline]
+ pub fn partition_path(&self) -> &str {
+ self.partition_path.as_deref().unwrap_or_default()
+ }
+
+ /// Returns the instant time that marks the [FileSlice] creation.
+ ///
+ /// This is also an instant time stored in the [Timeline].
+ #[inline]
+ pub fn creation_instant_time(&self) -> &str {
+ &self.base_file.commit_timestamp
+ }
+
+ /// Load [FileMetadata] from storage layer for the [BaseFile] if
`file_metadata` is [None]
+ /// or if `file_metadata` is not fully populated.
+ pub async fn load_metadata_if_needed(&mut self, storage: &Storage) ->
Result<()> {
+ if let Some(metadata) = &self.base_file.file_metadata {
+ if metadata.fully_populated {
+ return Ok(());
+ }
+ }
+ let relative_path = self.base_file_relative_path()?;
+ let fetched_metadata =
storage.get_file_metadata(&relative_path).await?;
+ self.base_file.file_metadata = Some(fetched_metadata);
+ Ok(())
+ }
+}
diff --git a/crates/core/src/file_group/log_file/log_block.rs
b/crates/core/src/file_group/log_file/log_block.rs
index 387b709..ab59a40 100644
--- a/crates/core/src/file_group/log_file/log_block.rs
+++ b/crates/core/src/file_group/log_file/log_block.rs
@@ -129,6 +129,7 @@ impl TryFrom<[u8; 4]> for BlockMetadataKey {
}
}
+#[allow(dead_code)]
#[derive(Debug, Clone)]
pub struct LogBlock {
pub format_version: LogFormatVersion,
diff --git a/crates/core/src/file_group/log_file/mod.rs
b/crates/core/src/file_group/log_file/mod.rs
index 46dd403..2012861 100644
--- a/crates/core/src/file_group/log_file/mod.rs
+++ b/crates/core/src/file_group/log_file/mod.rs
@@ -17,35 +17,37 @@
* under the License.
*/
use crate::error::CoreError;
+use crate::storage::file_metadata::FileMetadata;
use crate::Result;
+use std::cmp::Ordering;
use std::str::FromStr;
mod log_block;
mod log_format;
pub mod reader;
-#[derive(Clone, Debug, PartialEq, Eq)]
+#[derive(Clone, Debug)]
pub struct LogFile {
- file_id: String,
- base_commit_timestamp: String,
- log_file_extension: String,
- log_file_version: String,
- file_write_token: String,
+ pub file_id: String,
+ pub base_commit_timestamp: String,
+ pub extension: String,
+ pub version: String,
+ pub write_token: String,
+ pub file_metadata: Option<FileMetadata>,
}
const LOG_FILE_PREFIX: char = '.';
-impl FromStr for LogFile {
- type Err = CoreError;
-
- /// Parse a log file name into a [LogFile].
+impl LogFile {
+ /// Parse a log file's name into parts.
///
/// File name format:
///
/// ```text
/// .[File Id]_[Base Commit Timestamp].[Log File Extension].[Log File
Version]_[File Write Token]
/// ```
- fn from_str(file_name: &str) -> Result<Self, Self::Err> {
+ /// TODO support `.cdc` suffix
+ fn parse_file_name(file_name: &str) -> Result<(String, String, String,
String, String)> {
let err_msg = format!("Failed to parse file name '{file_name}' for log
file.");
if !file_name.starts_with(LOG_FILE_PREFIX) {
@@ -80,32 +82,90 @@ impl FromStr for LogFile {
return Err(CoreError::FileGroup(err_msg.clone()));
}
- Ok(LogFile {
- file_id: file_id.to_string(),
- base_commit_timestamp: base_commit_timestamp.to_string(),
- log_file_extension: log_file_extension.to_string(),
- log_file_version: log_file_version.to_string(),
- file_write_token: file_write_token.to_string(),
- })
+ Ok((
+ file_id.to_string(),
+ base_commit_timestamp.to_string(),
+ log_file_extension.to_string(),
+ log_file_version.to_string(),
+ file_write_token.to_string(),
+ ))
}
-}
-impl LogFile {
- /// Returns the file name of the log file.
#[inline]
pub fn file_name(&self) -> String {
format!(
-
"{prefix}{file_id}_{base_commit_timestamp}.{log_file_extension}.{log_file_version}_{file_write_token}",
+
"{prefix}{file_id}_{base_commit_timestamp}.{extension}.{version}_{write_token}",
prefix = LOG_FILE_PREFIX,
file_id = self.file_id,
base_commit_timestamp = self.base_commit_timestamp,
- log_file_extension = self.log_file_extension,
- log_file_version = self.log_file_version,
- file_write_token = self.file_write_token
+ extension = self.extension,
+ version = self.version,
+ write_token = self.write_token
)
}
}
+impl FromStr for LogFile {
+ type Err = CoreError;
+
+ /// Parse a log file name into a [LogFile].
+ fn from_str(file_name: &str) -> Result<Self, Self::Err> {
+ let (file_id, base_commit_timestamp, extension, version, write_token) =
+ Self::parse_file_name(file_name)?;
+ Ok(LogFile {
+ file_id,
+ base_commit_timestamp,
+ extension,
+ version,
+ write_token,
+ file_metadata: None,
+ })
+ }
+}
+
+impl TryFrom<FileMetadata> for LogFile {
+ type Error = CoreError;
+
+ fn try_from(metadata: FileMetadata) -> Result<Self> {
+ let file_name = metadata.name.as_str();
+ let (file_id, base_commit_timestamp, extension, version, write_token) =
+ Self::parse_file_name(file_name)?;
+ Ok(LogFile {
+ file_id,
+ base_commit_timestamp,
+ extension,
+ version,
+ write_token,
+ file_metadata: Some(metadata),
+ })
+ }
+}
+
+impl PartialEq for LogFile {
+ fn eq(&self, other: &Self) -> bool {
+ self.file_name() == other.file_name()
+ }
+}
+
+impl Eq for LogFile {}
+
+impl PartialOrd for LogFile {
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ Some(self.cmp(other))
+ }
+}
+
+impl Ord for LogFile {
+ fn cmp(&self, other: &Self) -> Ordering {
+ // Compare fields in order: base_commit_timestamp, version, write_token
+ // TODO support `.cdc` suffix
+ self.base_commit_timestamp
+ .cmp(&other.base_commit_timestamp)
+ .then(self.version.cmp(&other.version))
+ .then(self.write_token.cmp(&other.write_token))
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;
@@ -117,9 +177,9 @@ mod tests {
assert_eq!(log_file.file_id, "54e9a5e9-ee5d-4ed2-acee-720b5810d380-0");
assert_eq!(log_file.base_commit_timestamp, "20250109233025121");
- assert_eq!(log_file.log_file_extension, "log");
- assert_eq!(log_file.log_file_version, "1");
- assert_eq!(log_file.file_write_token, "0-51-115");
+ assert_eq!(log_file.extension, "log");
+ assert_eq!(log_file.version, "1");
+ assert_eq!(log_file.write_token, "0-51-115");
}
#[test]
@@ -182,4 +242,57 @@ mod tests {
));
}
}
+
+ #[test]
+ fn test_log_file_ordering() {
+ // Same timestamp, different version
+ let log1 = LogFile {
+ file_id: "ee2ace10-7667-40f5-9848-0a144b5ea064-0".to_string(),
+ base_commit_timestamp: "20250113230302428".to_string(),
+ extension: "log".to_string(),
+ version: "1".to_string(),
+ write_token: "0-188-387".to_string(),
+ file_metadata: None,
+ };
+
+ let log2 = LogFile {
+ file_id: "ee2ace10-7667-40f5-9848-0a144b5ea064-0".to_string(),
+ base_commit_timestamp: "20250113230302428".to_string(),
+ extension: "log".to_string(),
+ version: "2".to_string(),
+ write_token: "0-188-387".to_string(),
+ file_metadata: None,
+ };
+
+ // Different timestamp
+ let log3 = LogFile {
+ file_id: "ee2ace10-7667-40f5-9848-0a144b5ea064-0".to_string(),
+ base_commit_timestamp: "20250113230424191".to_string(),
+ extension: "log".to_string(),
+ version: "1".to_string(),
+ write_token: "0-188-387".to_string(),
+ file_metadata: None,
+ };
+
+ // Same timestamp and version, different write token
+ let log4 = LogFile {
+ file_id: "ee2ace10-7667-40f5-9848-0a144b5ea064-0".to_string(),
+ base_commit_timestamp: "20250113230302428".to_string(),
+ extension: "log".to_string(),
+ version: "1".to_string(),
+ write_token: "1-188-387".to_string(),
+ file_metadata: None,
+ };
+
+ // Test ordering
+ assert!(log1 < log2, "version ordering failed"); // version comparison
+ assert!(log1 < log3, "timestamp ordering failed"); // timestamp
comparison
+ assert!(log2 < log3, "timestamp ordering failed"); // timestamp
comparison
+ assert!(log1 < log4, "write token ordering failed"); // write token
comparison
+
+ // Test sorting a vector of log files
+ let mut logs = vec![log3.clone(), log4.clone(), log1.clone(),
log2.clone()];
+ logs.sort();
+ assert_eq!(logs, vec![log1, log4, log2, log3]);
+ }
}
diff --git a/crates/core/src/file_group/log_file/reader.rs
b/crates/core/src/file_group/log_file/reader.rs
index 84b848b..8a0d439 100644
--- a/crates/core/src/file_group/log_file/reader.rs
+++ b/crates/core/src/file_group/log_file/reader.rs
@@ -25,6 +25,7 @@ use
crate::file_group::log_file::log_format::{LogFormatVersion, MAGIC};
use crate::storage::reader::StorageReader;
use crate::storage::Storage;
use crate::Result;
+use arrow_array::RecordBatch;
use bytes::BytesMut;
use std::collections::HashMap;
use std::io::{self, Read, Seek};
@@ -50,13 +51,22 @@ impl LogFileReader<StorageReader> {
})
}
- pub fn read_all_blocks(mut self) -> Result<Vec<LogBlock>> {
+ fn read_all_blocks(&mut self) -> Result<Vec<LogBlock>> {
let mut blocks = Vec::new();
while let Some(block) = self.read_next_block()? {
blocks.push(block);
}
Ok(blocks)
}
+
+ pub fn read_all_records_unmerged(&mut self) -> Result<Vec<RecordBatch>> {
+ let all_blocks = self.read_all_blocks()?;
+ let mut batches = Vec::new();
+ for block in all_blocks {
+ batches.extend_from_slice(&block.record_batches);
+ }
+ Ok(batches)
+ }
}
impl<R: Read + Seek> LogFileReader<R> {
@@ -256,7 +266,7 @@ mod tests {
let (dir, file_name) = get_sample_log_file();
let dir_url = Url::from_directory_path(dir).unwrap();
let storage = Storage::new_with_base_url(dir_url).unwrap();
- let reader = LogFileReader::new(storage, &file_name).await.unwrap();
+ let mut reader = LogFileReader::new(storage,
&file_name).await.unwrap();
let blocks = reader.read_all_blocks().unwrap();
assert_eq!(blocks.len(), 1);
diff --git a/crates/core/src/file_group/mod.rs
b/crates/core/src/file_group/mod.rs
index fce9786..7d3bcc7 100644
--- a/crates/core/src/file_group/mod.rs
+++ b/crates/core/src/file_group/mod.rs
@@ -22,82 +22,20 @@
pub mod base_file;
pub mod builder;
+pub mod file_slice;
pub mod log_file;
pub mod reader;
use crate::error::CoreError;
use crate::file_group::base_file::BaseFile;
-use crate::storage::Storage;
+use crate::file_group::log_file::LogFile;
use crate::Result;
+use file_slice::FileSlice;
use std::collections::BTreeMap;
use std::fmt;
use std::fmt::Formatter;
use std::hash::{Hash, Hasher};
-use std::path::PathBuf;
-
-/// Within a [FileGroup], a [FileSlice] is a logical group of [BaseFile] and
log files.
-///
-/// [note] The log files are not yet supported.
-#[derive(Clone, Debug)]
-pub struct FileSlice {
- pub base_file: BaseFile,
- pub partition_path: Option<String>,
-}
-
-impl FileSlice {
- pub fn new(base_file: BaseFile, partition_path: Option<String>) -> Self {
- Self {
- base_file,
- partition_path,
- }
- }
-
- /// Returns the relative path of the base file.
- pub fn base_file_relative_path(&self) -> Result<String> {
- let file_name = &self.base_file.file_name;
- let path = PathBuf::from(self.partition_path()).join(file_name);
- path.to_str().map(|s| s.to_string()).ok_or_else(|| {
- CoreError::FileGroup(format!(
- "Failed to get base file relative path for file slice: {:?}",
- self
- ))
- })
- }
-
- /// Returns the enclosing [FileGroup]'s id.
- #[inline]
- pub fn file_id(&self) -> &str {
- &self.base_file.file_id
- }
-
- /// Returns the partition path of the [FileSlice].
- #[inline]
- pub fn partition_path(&self) -> &str {
- self.partition_path.as_deref().unwrap_or_default()
- }
-
- /// Returns the instant time that marks the [FileSlice] creation.
- ///
- /// This is also an instant time stored in the [Timeline].
- #[inline]
- pub fn creation_instant_time(&self) -> &str {
- &self.base_file.instant_time
- }
-
- /// Load [FileMetadata] from storage layer for the [BaseFile] if
`file_metadata` is [None]
- /// or if `file_metadata` is not fully populated.
- pub async fn load_metadata_if_needed(&mut self, storage: &Storage) ->
Result<()> {
- if let Some(metadata) = &self.base_file.file_metadata {
- if metadata.fully_populated {
- return Ok(());
- }
- }
- let relative_path = self.base_file_relative_path()?;
- let fetched_metadata =
storage.get_file_metadata(&relative_path).await?;
- self.base_file.file_metadata = Some(fetched_metadata);
- Ok(())
- }
-}
+use std::str::FromStr;
/// Hudi File Group.
#[derive(Clone, Debug)]
@@ -154,26 +92,67 @@ impl FileGroup {
}
pub fn add_base_file_from_name(&mut self, file_name: &str) ->
Result<&Self> {
- let base_file = BaseFile::try_from(file_name)?;
+ let base_file = BaseFile::from_str(file_name)?;
self.add_base_file(base_file)
}
pub fn add_base_file(&mut self, base_file: BaseFile) -> Result<&Self> {
- let instant_time = base_file.instant_time.as_str();
- if self.file_slices.contains_key(instant_time) {
+ let commit_timestamp = base_file.commit_timestamp.as_str();
+ if self.file_slices.contains_key(commit_timestamp) {
Err(CoreError::FileGroup(format!(
- "Instant time {instant_time} is already present in File Group
{}",
+ "Instant time {commit_timestamp} is already present in File
Group {}",
self.file_id
)))
} else {
self.file_slices.insert(
- instant_time.to_owned(),
+ commit_timestamp.to_owned(),
FileSlice::new(base_file, self.partition_path.clone()),
);
Ok(self)
}
}
+ pub fn add_base_files<I>(&mut self, base_files: I) -> Result<&Self>
+ where
+ I: IntoIterator<Item = BaseFile>,
+ {
+ for base_file in base_files {
+ self.add_base_file(base_file)?;
+ }
+ Ok(self)
+ }
+
+ pub fn add_log_file_from_name(&mut self, file_name: &str) -> Result<&Self>
{
+ let log_file = LogFile::from_str(file_name)?;
+ self.add_log_file(log_file)
+ }
+
+ /// Add a [LogFile] to the [FileGroup].
+ ///
+ /// TODO: support adding log files to file group without base files.
+ pub fn add_log_file(&mut self, log_file: LogFile) -> Result<&Self> {
+ let commit_timestamp = log_file.base_commit_timestamp.as_str();
+ if let Some(file_slice) = self.file_slices.get_mut(commit_timestamp) {
+ file_slice.log_files.insert(log_file);
+ Ok(self)
+ } else {
+ Err(CoreError::FileGroup(format!(
+ "Instant time {commit_timestamp} not found in File Group {}",
+ self.file_id
+ )))
+ }
+ }
+
+ pub fn add_log_files<I>(&mut self, log_files: I) -> Result<&Self>
+ where
+ I: IntoIterator<Item = LogFile>,
+ {
+ for log_file in log_files {
+ self.add_log_file(log_file)?;
+ }
+ Ok(self)
+ }
+
pub fn get_file_slice_as_of(&self, timestamp: &str) -> Option<&FileSlice> {
let as_of = timestamp.to_string();
if let Some((_, file_slice)) =
self.file_slices.range(..=as_of).next_back() {
@@ -214,7 +193,7 @@ mod tests {
fg.get_file_slice_as_of("20240402123035233")
.unwrap()
.base_file
- .instant_time,
+ .commit_timestamp,
"20240402123035233"
);
assert!(fg.get_file_slice_as_of("-1").is_none());
diff --git a/crates/core/src/file_group/reader.rs
b/crates/core/src/file_group/reader.rs
index 49200f7..387b591 100644
--- a/crates/core/src/file_group/reader.rs
+++ b/crates/core/src/file_group/reader.rs
@@ -21,7 +21,7 @@ use crate::config::util::split_hudi_options_from_others;
use crate::config::HudiConfigs;
use crate::error::CoreError::ReadFileSliceError;
use crate::expr::filter::{Filter, SchemableFilter};
-use crate::file_group::FileSlice;
+use crate::file_group::file_slice::FileSlice;
use crate::storage::Storage;
use crate::Result;
use arrow::compute::and;
@@ -30,25 +30,30 @@ use arrow_schema::Schema;
use futures::TryFutureExt;
use std::sync::Arc;
+use crate::file_group::log_file::reader::LogFileReader;
+use crate::merge::record_merger::RecordMerger;
use arrow::compute::filter_record_batch;
/// File group reader handles all read operations against a file group.
#[derive(Clone, Debug)]
pub struct FileGroupReader {
storage: Arc<Storage>,
+ hudi_configs: Arc<HudiConfigs>,
and_filters: Vec<SchemableFilter>,
}
impl FileGroupReader {
- pub fn new(storage: Arc<Storage>) -> Self {
+ pub fn new(storage: Arc<Storage>, hudi_configs: Arc<HudiConfigs>) -> Self {
Self {
storage,
+ hudi_configs,
and_filters: Vec::new(),
}
}
pub fn new_with_filters(
storage: Arc<Storage>,
+ hudi_configs: Arc<HudiConfigs>,
and_filters: &[Filter],
schema: &Schema,
) -> Result<Self> {
@@ -59,6 +64,7 @@ impl FileGroupReader {
Ok(Self {
storage,
+ hudi_configs,
and_filters,
})
}
@@ -77,11 +83,8 @@ impl FileGroupReader {
let hudi_configs = Arc::new(HudiConfigs::new(hudi_opts));
- let storage = Storage::new(Arc::new(others), hudi_configs)?;
- Ok(Self {
- storage,
- and_filters: Vec::new(),
- })
+ let storage = Storage::new(Arc::new(others), hudi_configs.clone())?;
+ Ok(Self::new(storage, hudi_configs))
}
fn create_boolean_array_mask(&self, records: &RecordBatch) ->
Result<BooleanArray> {
@@ -117,9 +120,33 @@ impl FileGroupReader {
.map_err(|e| ReadFileSliceError(format!("Failed to filter records:
{e:?}")))
}
- pub async fn read_file_slice(&self, file_slice: &FileSlice) ->
Result<RecordBatch> {
+ pub async fn read_file_slice(
+ &self,
+ file_slice: &FileSlice,
+ base_file_only: bool,
+ ) -> Result<RecordBatch> {
let relative_path = file_slice.base_file_relative_path()?;
- self.read_file_slice_by_base_file_path(&relative_path).await
+ if base_file_only {
+ // TODO caller to support read optimized queries
+ self.read_file_slice_by_base_file_path(&relative_path).await
+ } else {
+ let base_file_records = self
+ .read_file_slice_by_base_file_path(&relative_path)
+ .await?;
+ let schema = base_file_records.schema();
+ let mut all_records = vec![base_file_records];
+
+ for log_file in &file_slice.log_files {
+ let relative_path =
file_slice.log_file_relative_path(log_file)?;
+ let storage = self.storage.clone();
+ let mut log_file_reader = LogFileReader::new(storage,
&relative_path).await?;
+ let log_file_records =
log_file_reader.read_all_records_unmerged()?;
+ all_records.extend_from_slice(&log_file_records);
+ }
+
+ let merger = RecordMerger::new(self.hudi_configs.clone());
+ merger.merge_record_batches(&schema, &all_records)
+ }
}
}
@@ -138,7 +165,7 @@ mod tests {
fn test_new() {
let base_url = Url::parse("file:///tmp/hudi_data").unwrap();
let storage = Storage::new_with_base_url(base_url).unwrap();
- let fg_reader = FileGroupReader::new(storage.clone());
+ let fg_reader = FileGroupReader::new(storage.clone(),
Arc::from(HudiConfigs::empty()));
assert!(Arc::ptr_eq(&fg_reader.storage, &storage));
}
@@ -155,9 +182,15 @@ mod tests {
let base_url = Url::parse("file:///tmp/hudi_data").unwrap();
let storage = Storage::new_with_base_url(base_url)?;
let schema = create_test_schema();
+ let empty_configs = Arc::new(HudiConfigs::empty());
// Test case 1: Empty filters
- let reader = FileGroupReader::new_with_filters(storage.clone(), &[],
&schema)?;
+ let reader = FileGroupReader::new_with_filters(
+ storage.clone(),
+ empty_configs.clone(),
+ &[],
+ &schema,
+ )?;
assert!(reader.and_filters.is_empty());
// Test case 2: Multiple filters
@@ -165,14 +198,23 @@ mod tests {
FilterField::new("_hoodie_commit_time").gt("0"),
FilterField::new("age").gte("18"),
];
- let reader = FileGroupReader::new_with_filters(storage.clone(),
&filters, &schema)?;
+ let reader = FileGroupReader::new_with_filters(
+ storage.clone(),
+ empty_configs.clone(),
+ &filters,
+ &schema,
+ )?;
assert_eq!(reader.and_filters.len(), 2);
// Test case 3: Invalid field name should error
let invalid_filters =
vec![FilterField::new("non_existent_field").eq("value")];
- assert!(
- FileGroupReader::new_with_filters(storage.clone(),
&invalid_filters, &schema).is_err()
- );
+ assert!(FileGroupReader::new_with_filters(
+ storage.clone(),
+ empty_configs.clone(),
+ &invalid_filters,
+ &schema
+ )
+ .is_err());
Ok(())
}
@@ -194,7 +236,8 @@ mod tests {
let storage =
Storage::new_with_base_url(Url::parse("file:///non-existent-path/table").unwrap())
.unwrap();
- let reader = FileGroupReader::new(storage);
+ let empty_configs = Arc::new(HudiConfigs::empty());
+ let reader = FileGroupReader::new(storage, empty_configs.clone());
let result = reader
.read_file_slice_by_base_file_path("non_existent_file")
.await;
@@ -217,17 +260,28 @@ mod tests {
fn test_create_boolean_array_mask() -> Result<()> {
let storage =
Storage::new_with_base_url(Url::parse("file:///non-existent-path/table").unwrap())?;
+ let empty_configs = Arc::new(HudiConfigs::empty());
let schema = create_test_schema();
let records = create_test_record_batch()?;
// Test case 1: No filters
- let reader = FileGroupReader::new_with_filters(storage.clone(), &[],
&schema)?;
+ let reader = FileGroupReader::new_with_filters(
+ storage.clone(),
+ empty_configs.clone(),
+ &[],
+ &schema,
+ )?;
let mask = reader.create_boolean_array_mask(&records)?;
assert_eq!(mask, BooleanArray::from(vec![true; 5]));
// Test case 2: Single filter on commit time
let filters = vec![FilterField::new("_hoodie_commit_time").gt("2")];
- let reader = FileGroupReader::new_with_filters(storage.clone(),
&filters, &schema)?;
+ let reader = FileGroupReader::new_with_filters(
+ storage.clone(),
+ empty_configs.clone(),
+ &filters,
+ &schema,
+ )?;
let mask = reader.create_boolean_array_mask(&records)?;
assert_eq!(
mask,
@@ -240,7 +294,12 @@ mod tests {
FilterField::new("_hoodie_commit_time").gt("2"),
FilterField::new("age").lt("40"),
];
- let reader = FileGroupReader::new_with_filters(storage.clone(),
&filters, &schema)?;
+ let reader = FileGroupReader::new_with_filters(
+ storage.clone(),
+ empty_configs.clone(),
+ &filters,
+ &schema,
+ )?;
let mask = reader.create_boolean_array_mask(&records)?;
assert_eq!(
mask,
@@ -250,7 +309,12 @@ mod tests {
// Test case 4: Filter resulting in all false
let filters = vec![FilterField::new("age").gt("100")];
- let reader = FileGroupReader::new_with_filters(storage.clone(),
&filters, &schema)?;
+ let reader = FileGroupReader::new_with_filters(
+ storage.clone(),
+ empty_configs.clone(),
+ &filters,
+ &schema,
+ )?;
let mask = reader.create_boolean_array_mask(&records)?;
assert_eq!(mask, BooleanArray::from(vec![false; 5]));
diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs
index 85f83b9..ce7695b 100644
--- a/crates/core/src/storage/mod.rs
+++ b/crates/core/src/storage/mod.rs
@@ -30,7 +30,7 @@ use futures::StreamExt;
use object_store::path::Path as ObjPath;
use object_store::{parse_url_opts, ObjectStore};
use parquet::arrow::async_reader::ParquetObjectReader;
-use parquet::arrow::ParquetRecordBatchStreamBuilder;
+use parquet::arrow::{parquet_to_arrow_schema, ParquetRecordBatchStreamBuilder};
use parquet::file::metadata::ParquetMetaData;
use url::Url;
@@ -154,6 +154,17 @@ impl Storage {
Ok(builder.metadata().as_ref().clone())
}
+ pub async fn get_parquet_file_schema(
+ &self,
+ relative_path: &str,
+ ) -> Result<arrow::datatypes::Schema> {
+ let parquet_meta =
self.get_parquet_file_metadata(relative_path).await?;
+ Ok(parquet_to_arrow_schema(
+ parquet_meta.file_metadata().schema_descr(),
+ None,
+ )?)
+ }
+
pub async fn get_file_data(&self, relative_path: &str) -> Result<Bytes> {
let obj_url = join_url_segments(&self.base_url, &[relative_path])?;
let obj_path = ObjPath::from_url_path(obj_url.path())?;
diff --git a/crates/core/src/table/builder.rs b/crates/core/src/table/builder.rs
index 0cf2b00..853ef1b 100644
--- a/crates/core/src/table/builder.rs
+++ b/crates/core/src/table/builder.rs
@@ -22,17 +22,15 @@ use std::collections::HashMap;
use std::env;
use std::hash::Hash;
use std::path::PathBuf;
-use std::str::FromStr;
use std::sync::Arc;
use strum::IntoEnumIterator;
use crate::config::internal::HudiInternalConfig::SkipConfigValidation;
use crate::config::read::HudiReadConfig;
+use crate::config::table::HudiTableConfig;
use crate::config::table::HudiTableConfig::{
- DropsPartitionFields, TableType, TableVersion, TimelineLayoutVersion,
+ DropsPartitionFields, TableVersion, TimelineLayoutVersion,
};
-use crate::config::table::TableTypeValue::CopyOnWrite;
-use crate::config::table::{HudiTableConfig, TableTypeValue};
use crate::config::util::{parse_data_for_options,
split_hudi_options_from_others};
use crate::config::{HudiConfigs, HUDI_CONF_DIR};
use crate::error::CoreError;
@@ -254,13 +252,6 @@ impl TableBuilder {
}
// additional validation
- let table_type = hudi_configs.get(TableType)?.to::<String>();
- if TableTypeValue::from_str(&table_type)? != CopyOnWrite {
- return Err(CoreError::Unsupported(
- "Only support copy-on-write table.".to_string(),
- ));
- }
-
let table_version = hudi_configs.get(TableVersion)?.to::<isize>();
if !(5..=6).contains(&table_version) {
return Err(CoreError::Unsupported(
diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs
index 455e823..0abbe08 100644
--- a/crates/core/src/table/fs_view.rs
+++ b/crates/core/src/table/fs_view.rs
@@ -22,13 +22,15 @@ use std::sync::Arc;
use crate::config::HudiConfigs;
use crate::file_group::base_file::BaseFile;
-use crate::file_group::{FileGroup, FileSlice};
+use crate::file_group::FileGroup;
use crate::storage::{get_leaf_dirs, Storage};
use crate::config::read::HudiReadConfig::ListingParallelism;
+use crate::config::table::HudiTableConfig::BaseFileFormat;
use crate::error::CoreError;
-use crate::storage::file_metadata::FileMetadata;
-use crate::table::partition::PartitionPruner;
+use crate::file_group::file_slice::FileSlice;
+use crate::file_group::log_file::LogFile;
+use crate::table::partition::{PartitionPruner, PARTITION_METAFIELD_PREFIX};
use crate::Result;
use dashmap::DashMap;
use futures::stream::{self, StreamExt, TryStreamExt};
@@ -57,6 +59,10 @@ impl FileSystemView {
})
}
+ fn should_exclude_for_listing(file_name: &str) -> bool {
+ file_name.starts_with(PARTITION_METAFIELD_PREFIX) ||
file_name.ends_with(".crc")
+ }
+
async fn list_all_partition_paths(storage: &Storage) ->
Result<Vec<String>> {
Self::list_partition_paths(storage, &PartitionPruner::empty()).await
}
@@ -91,31 +97,61 @@ impl FileSystemView {
async fn list_file_groups_for_partition(
storage: &Storage,
partition_path: &str,
+ base_file_format: &str,
) -> Result<Vec<FileGroup>> {
- let file_metadata: Vec<FileMetadata> = storage
- .list_files(Some(partition_path))
- .await?
- .into_iter()
- .filter(|f| f.name.ends_with(".parquet"))
- .collect();
+ let listed_file_metadata =
storage.list_files(Some(partition_path)).await?;
+
+ let mut file_id_to_base_files: HashMap<String, Vec<BaseFile>> =
HashMap::new();
+ let mut file_id_to_log_files: HashMap<String, Vec<LogFile>> =
HashMap::new();
+
+ for file_metadata in listed_file_metadata {
+ if Self::should_exclude_for_listing(&file_metadata.name) {
+ continue;
+ }
- let mut fg_id_to_base_files: HashMap<String, Vec<BaseFile>> =
HashMap::new();
- for metadata in file_metadata {
- let base_file = BaseFile::try_from(metadata)?;
- let fg_id = &base_file.file_id;
- fg_id_to_base_files
- .entry(fg_id.to_owned())
- .or_default()
- .push(base_file);
+ let base_file_extension = format!(".{}", base_file_format);
+ if file_metadata.name.ends_with(&base_file_extension) {
+ // After excluding the unintended files,
+ // we expect a file that has the base file extension to be a
valid base file.
+ let base_file = BaseFile::try_from(file_metadata)?;
+ let file_id = &base_file.file_id;
+ file_id_to_base_files
+ .entry(file_id.to_owned())
+ .or_default()
+ .push(base_file);
+ } else {
+ match LogFile::try_from(file_metadata) {
+ Ok(log_file) => {
+ let file_id = &log_file.file_id;
+ file_id_to_log_files
+ .entry(file_id.to_owned())
+ .or_default()
+ .push(log_file);
+ }
+ Err(e) => {
+ // We don't support cdc log files yet, hence skipping
error when parsing
+ // fails. However, once we support all data files, we
should return error
+ // here because we expect all files to be either base
files or log files,
+ // after excluding the unintended files.
+ log::warn!("Failed to create a log file: {}", e);
+ continue;
+ }
+ }
+ }
}
let mut file_groups: Vec<FileGroup> = Vec::new();
- for (fg_id, base_files) in fg_id_to_base_files.into_iter() {
- let mut fg = FileGroup::new(fg_id.to_owned(),
Some(partition_path.to_owned()));
- for bf in base_files {
- fg.add_base_file(bf)?;
- }
- file_groups.push(fg);
+ // TODO support creating file groups without base files
+ for (file_id, base_files) in file_id_to_base_files.into_iter() {
+ let mut file_group =
+ FileGroup::new(file_id.to_owned(),
Some(partition_path.to_owned()));
+
+ file_group.add_base_files(base_files)?;
+
+ let log_files =
file_id_to_log_files.remove(&file_id).unwrap_or_default();
+ file_group.add_log_files(log_files)?;
+
+ file_groups.push(file_group);
}
Ok(file_groups)
}
@@ -129,15 +165,23 @@ impl FileSystemView {
.filter(|p| partition_pruner.should_include(p))
.collect::<HashSet<_>>();
+ let base_file_format = self
+ .hudi_configs
+ .get_or_default(BaseFileFormat)
+ .to::<String>();
let parallelism = self
.hudi_configs
.get_or_default(ListingParallelism)
.to::<usize>();
stream::iter(partition_paths_to_list)
- .map(|path| async move {
- let file_groups =
- Self::list_file_groups_for_partition(&self.storage,
&path).await?;
- Ok::<_, CoreError>((path, file_groups))
+ .map(|path| {
+ let base_file_format = base_file_format.clone();
+ async move {
+ let format = base_file_format.as_str();
+ let file_groups =
+ Self::list_file_groups_for_partition(&self.storage,
&path, format).await?;
+ Ok::<_, CoreError>((path, file_groups))
+ }
})
.buffer_unordered(parallelism)
.try_for_each(|(path, file_groups)| async move {
@@ -194,7 +238,7 @@ mod tests {
use crate::table::partition::PartitionPruner;
use crate::table::Table;
- use hudi_tests::TestTable;
+ use hudi_tests::SampleTable;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use url::Url;
@@ -210,7 +254,7 @@ mod tests {
#[tokio::test]
async fn get_partition_paths_for_nonpartitioned_table() {
- let base_url = TestTable::V6Nonpartitioned.url();
+ let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
let storage = Storage::new_with_base_url(base_url).unwrap();
let partition_pruner = PartitionPruner::empty();
let partition_paths = FileSystemView::list_partition_paths(&storage,
&partition_pruner)
@@ -223,7 +267,7 @@ mod tests {
#[tokio::test]
async fn get_partition_paths_for_complexkeygen_table() {
- let base_url = TestTable::V6ComplexkeygenHivestyle.url();
+ let base_url = SampleTable::V6ComplexkeygenHivestyle.url_to_cow();
let storage = Storage::new_with_base_url(base_url).unwrap();
let partition_pruner = PartitionPruner::empty();
let partition_paths = FileSystemView::list_partition_paths(&storage,
&partition_pruner)
@@ -243,7 +287,7 @@ mod tests {
#[tokio::test]
async fn fs_view_get_latest_file_slices() {
- let base_url = TestTable::V6Nonpartitioned.url();
+ let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
let fs_view = create_test_fs_view(base_url).await;
assert!(fs_view.partition_to_file_groups.is_empty());
@@ -255,11 +299,11 @@ mod tests {
.unwrap();
assert_eq!(fs_view.partition_to_file_groups.len(), 1);
assert_eq!(file_slices.len(), 1);
- let fg_ids = file_slices
+ let file_ids = file_slices
.iter()
.map(|fsl| fsl.file_id())
.collect::<Vec<_>>();
- assert_eq!(fg_ids, vec!["a079bdb3-731c-4894-b855-abfcd6921007-0"]);
+ assert_eq!(file_ids, vec!["a079bdb3-731c-4894-b855-abfcd6921007-0"]);
for fsl in file_slices.iter() {
assert_eq!(fsl.base_file.file_metadata.as_ref().unwrap().num_records, 4);
}
@@ -267,7 +311,7 @@ mod tests {
#[tokio::test]
async fn fs_view_get_latest_file_slices_with_replace_commit() {
- let base_url =
TestTable::V6SimplekeygenNonhivestyleOverwritetable.url();
+ let base_url =
SampleTable::V6SimplekeygenNonhivestyleOverwritetable.url_to_cow();
let hudi_table = Table::new(base_url.path()).await.unwrap();
let fs_view = create_test_fs_view(base_url).await;
@@ -284,11 +328,11 @@ mod tests {
.unwrap();
assert_eq!(fs_view.partition_to_file_groups.len(), 3);
assert_eq!(file_slices.len(), 1);
- let fg_ids = file_slices
+ let file_ids = file_slices
.iter()
.map(|fsl| fsl.file_id())
.collect::<Vec<_>>();
- assert_eq!(fg_ids, vec!["ebcb261d-62d3-4895-90ec-5b3c9622dff4-0"]);
+ assert_eq!(file_ids, vec!["ebcb261d-62d3-4895-90ec-5b3c9622dff4-0"]);
for fsl in file_slices.iter() {
assert_eq!(fsl.base_file.file_metadata.as_ref().unwrap().num_records, 1);
}
@@ -296,7 +340,7 @@ mod tests {
#[tokio::test]
async fn fs_view_get_latest_file_slices_with_partition_filters() {
- let base_url = TestTable::V6ComplexkeygenHivestyle.url();
+ let base_url = SampleTable::V6ComplexkeygenHivestyle.url_to_cow();
let hudi_table = Table::new(base_url.path()).await.unwrap();
let fs_view = create_test_fs_view(base_url).await;
@@ -325,11 +369,11 @@ mod tests {
assert_eq!(fs_view.partition_to_file_groups.len(), 1);
assert_eq!(file_slices.len(), 1);
- let fg_ids = file_slices
+ let file_ids = file_slices
.iter()
.map(|fsl| fsl.file_id())
.collect::<Vec<_>>();
- assert_eq!(fg_ids, vec!["a22e8257-e249-45e9-ba46-115bc85adcba-0"]);
+ assert_eq!(file_ids, vec!["a22e8257-e249-45e9-ba46-115bc85adcba-0"]);
for fsl in file_slices.iter() {
assert_eq!(fsl.base_file.file_metadata.as_ref().unwrap().num_records, 2);
}
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 2c50fc7..0fdd46b 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -90,13 +90,13 @@ mod fs_view;
pub mod partition;
use crate::config::read::HudiReadConfig::AsOfTimestamp;
-use crate::config::table::HudiTableConfig;
use crate::config::table::HudiTableConfig::PartitionFields;
+use crate::config::table::{HudiTableConfig, TableTypeValue};
use crate::config::HudiConfigs;
use crate::error::CoreError;
use crate::expr::filter::{Filter, FilterField};
+use crate::file_group::file_slice::FileSlice;
use crate::file_group::reader::FileGroupReader;
-use crate::file_group::FileSlice;
use crate::table::builder::TableBuilder;
use crate::table::fs_view::FileSystemView;
use crate::table::partition::PartitionPruner;
@@ -106,6 +106,7 @@ use crate::Result;
use arrow::record_batch::RecordBatch;
use arrow_schema::{Field, Schema};
use std::collections::{HashMap, HashSet};
+use std::str::FromStr;
use std::sync::Arc;
use url::Url;
@@ -165,6 +166,16 @@ impl Table {
.register_object_store(runtime_env.clone());
}
+ pub fn get_table_type(&self) -> TableTypeValue {
+ let err_msg = format!("{:?} is missing or invalid.",
HudiTableConfig::TableType);
+ let table_type = self
+ .hudi_configs
+ .get(HudiTableConfig::TableType)
+ .expect(&err_msg)
+ .to::<String>();
+ TableTypeValue::from_str(table_type.as_str()).expect(&err_msg)
+ }
+
/// Get the latest [Schema] of the table.
pub async fn get_schema(&self) -> Result<Schema> {
self.timeline.get_latest_schema().await
@@ -245,7 +256,10 @@ impl Table {
}
pub fn create_file_group_reader(&self) -> FileGroupReader {
- FileGroupReader::new(self.file_system_view.storage.clone())
+ FileGroupReader::new(
+ self.file_system_view.storage.clone(),
+ self.hudi_configs.clone(),
+ )
}
pub async fn create_file_group_reader_with_filters(
@@ -253,7 +267,12 @@ impl Table {
filters: &[Filter],
) -> Result<FileGroupReader> {
let schema = self.get_schema().await?;
-
FileGroupReader::new_with_filters(self.file_system_view.storage.clone(),
filters, &schema)
+ FileGroupReader::new_with_filters(
+ self.file_system_view.storage.clone(),
+ self.hudi_configs.clone(),
+ filters,
+ &schema,
+ )
}
/// Get all the latest records in the table.
@@ -278,9 +297,13 @@ impl Table {
) -> Result<Vec<RecordBatch>> {
let file_slices = self.get_file_slices_as_of(timestamp,
filters).await?;
let fg_reader = self.create_file_group_reader();
- let batches =
- futures::future::try_join_all(file_slices.iter().map(|f|
fg_reader.read_file_slice(f)))
- .await?;
+ let base_file_only = self.get_table_type() ==
TableTypeValue::CopyOnWrite;
+ let batches = futures::future::try_join_all(
+ file_slices
+ .iter()
+ .map(|f| fg_reader.read_file_slice(f, base_file_only)),
+ )
+ .await?;
Ok(batches)
}
@@ -315,9 +338,13 @@ impl Table {
FilterField::new("_hoodie_commit_time").lte(as_of_timestamp),
];
let fg_reader =
self.create_file_group_reader_with_filters(filters).await?;
- let batches =
- futures::future::try_join_all(file_slices.iter().map(|f|
fg_reader.read_file_slice(f)))
- .await?;
+ let base_file_only = self.get_table_type() ==
TableTypeValue::CopyOnWrite;
+ let batches = futures::future::try_join_all(
+ file_slices
+ .iter()
+ .map(|f| fg_reader.read_file_slice(f, base_file_only)),
+ )
+ .await?;
Ok(batches)
}
@@ -346,7 +373,7 @@ mod tests {
use crate::storage::util::join_url_segments;
use crate::storage::Storage;
use crate::table::Filter;
- use hudi_tests::{assert_not, TestTable};
+ use hudi_tests::{assert_not, SampleTable};
use std::collections::HashSet;
use std::fs::canonicalize;
use std::path::PathBuf;
@@ -384,7 +411,7 @@ mod tests {
#[tokio::test]
async fn test_hudi_table_get_hudi_options() {
- let base_url = TestTable::V6Nonpartitioned.url();
+ let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
let hudi_table = Table::new(base_url.path()).await.unwrap();
let hudi_options = hudi_table.hudi_options();
for (k, v) in hudi_options.iter() {
@@ -395,7 +422,7 @@ mod tests {
#[tokio::test]
async fn test_hudi_table_get_storage_options() {
- let base_url = TestTable::V6Nonpartitioned.url();
+ let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
let hudi_table = Table::new(base_url.path()).await.unwrap();
let cloud_prefixes: HashSet<_> = Storage::CLOUD_STORAGE_PREFIXES
@@ -422,7 +449,7 @@ mod tests {
#[tokio::test]
async fn hudi_table_get_schema() {
- let base_url = TestTable::V6Nonpartitioned.url();
+ let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
let hudi_table = Table::new(base_url.path()).await.unwrap();
let fields: Vec<String> = hudi_table
.get_schema()
@@ -475,7 +502,7 @@ mod tests {
#[tokio::test]
async fn hudi_table_get_partition_schema() {
- let base_url = TestTable::V6TimebasedkeygenNonhivestyle.url();
+ let base_url = SampleTable::V6TimebasedkeygenNonhivestyle.url_to_cow();
let hudi_table = Table::new(base_url.path()).await.unwrap();
let fields: Vec<String> = hudi_table
.get_partition_schema()
@@ -566,7 +593,10 @@ mod tests {
async fn get_default_for_invalid_table_props() {
let table =
get_test_table_without_validation("table_props_invalid").await;
let configs = table.hudi_configs;
- assert!(panic::catch_unwind(||
configs.get_or_default(BaseFileFormat)).is_err());
+ assert_eq!(
+ configs.get_or_default(BaseFileFormat).to::<String>(),
+ "parquet"
+ );
assert!(panic::catch_unwind(||
configs.get_or_default(Checksum)).is_err());
assert_eq!(
configs.get_or_default(DatabaseName).to::<String>(),
@@ -672,7 +702,7 @@ mod tests {
#[tokio::test]
async fn hudi_table_read_file_slice() {
- let base_url = TestTable::V6Nonpartitioned.url();
+ let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
let hudi_table = Table::new(base_url.path()).await.unwrap();
let batches = hudi_table
.create_file_group_reader()
@@ -687,7 +717,7 @@ mod tests {
#[tokio::test]
async fn empty_hudi_table_get_file_slices_splits() {
- let base_url = TestTable::V6Empty.url();
+ let base_url = SampleTable::V6Empty.url_to_cow();
let hudi_table = Table::new(base_url.path()).await.unwrap();
let file_slices_splits = hudi_table.get_file_slices_splits(2,
&[]).await.unwrap();
@@ -696,7 +726,7 @@ mod tests {
#[tokio::test]
async fn hudi_table_get_file_slices_splits() {
- let base_url = TestTable::V6SimplekeygenNonhivestyle.url();
+ let base_url = SampleTable::V6SimplekeygenNonhivestyle.url_to_cow();
let hudi_table = Table::new(base_url.path()).await.unwrap();
let file_slices_splits = hudi_table.get_file_slices_splits(2,
&[]).await.unwrap();
@@ -707,7 +737,7 @@ mod tests {
#[tokio::test]
async fn hudi_table_get_file_slices_as_of_timestamps() {
- let base_url = TestTable::V6Nonpartitioned.url();
+ let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
let hudi_table = Table::new(base_url.path()).await.unwrap();
let file_slices = hudi_table.get_file_slices(&[]).await.unwrap();
@@ -764,7 +794,7 @@ mod tests {
#[tokio::test]
async fn hudi_table_get_file_paths_for_simple_keygen_non_hive_style() {
- let base_url = TestTable::V6SimplekeygenNonhivestyle.url();
+ let base_url = SampleTable::V6SimplekeygenNonhivestyle.url_to_cow();
let hudi_table = Table::new(base_url.path()).await.unwrap();
assert_eq!(hudi_table.timeline.completed_commits.len(), 2);
@@ -814,7 +844,7 @@ mod tests {
#[tokio::test]
async fn hudi_table_get_file_paths_for_complex_keygen_hive_style() {
- let base_url = TestTable::V6ComplexkeygenHivestyle.url();
+ let base_url = SampleTable::V6ComplexkeygenHivestyle.url_to_cow();
let hudi_table = Table::new(base_url.path()).await.unwrap();
assert_eq!(hudi_table.timeline.completed_commits.len(), 2);
@@ -865,24 +895,77 @@ mod tests {
mod test_snapshot_queries {
use super::super::*;
- use arrow_array::{Array, StringArray};
- use hudi_tests::TestTable;
- use std::collections::HashSet;
+ use crate::metadata::meta_field::MetaField;
+ use arrow_array::{Array, BooleanArray, Int32Array, StringArray};
+ use hudi_tests::SampleTable;
#[tokio::test]
async fn test_empty() -> Result<()> {
- let base_url = TestTable::V6Empty.url();
- let hudi_table = Table::new(base_url.path()).await?;
+ let base_urls = [
+ SampleTable::V6Empty.url_to_cow(),
+ SampleTable::V6Empty.url_to_mor(),
+ ];
+ for base_url in base_urls.iter() {
+ let hudi_table = Table::new(base_url.path()).await?;
+ let records = hudi_table.read_snapshot(&[]).await?;
+ assert!(records.is_empty());
+ }
+ Ok(())
+ }
- let records = hudi_table.read_snapshot(&[]).await?;
- assert!(records.is_empty());
+ #[tokio::test]
+ async fn test_non_partitioned() -> Result<()> {
+ let base_urls = [
+ SampleTable::V6Nonpartitioned.url_to_cow(),
+ SampleTable::V6Nonpartitioned.url_to_mor(),
+ ];
+ for base_url in base_urls.iter() {
+ let hudi_table = Table::new(base_url.path()).await?;
+ let records = hudi_table.read_snapshot(&[]).await?;
+ let all_records =
arrow::compute::concat_batches(&records[0].schema(), &records)?;
+
+ let ids = all_records
+ .column_by_name("id")
+ .unwrap()
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap();
+ let names = all_records
+ .column_by_name("name")
+ .unwrap()
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .unwrap();
+ let is_actives = all_records
+ .column_by_name("isActive")
+ .unwrap()
+ .as_any()
+ .downcast_ref::<BooleanArray>()
+ .unwrap();
+ let mut data: Vec<(i32, &str, bool)> = ids
+ .iter()
+ .zip(names.iter())
+ .zip(is_actives.iter())
+ .map(|((id, name), is_active)| (id.unwrap(),
name.unwrap(), is_active.unwrap()))
+ .collect();
+ data.sort_unstable_by_key(|(id, _, _)| *id);
+ assert_eq!(
+ data,
+ vec![
+ (1, "Alice", false),
+ (2, "Bob", false),
+ (3, "Carol", true),
+ (4, "Diana", true),
+ ]
+ )
+ }
Ok(())
}
#[tokio::test]
- async fn test_complex_keygen_hive_style() -> Result<()> {
- let base_url = TestTable::V6ComplexkeygenHivestyle.url();
+ async fn test_complex_keygen_hive_style_with_filters() -> Result<()> {
+ let base_url = SampleTable::V6ComplexkeygenHivestyle.url_to_mor();
let hudi_table = Table::new(base_url.path()).await?;
let filters = &[
@@ -891,32 +974,50 @@ mod tests {
Filter::try_from(("shortField", "!=", "100"))?,
];
let records = hudi_table.read_snapshot(filters).await?;
- assert_eq!(records.len(), 1);
- assert_eq!(records[0].num_rows(), 2);
+ let all_records =
arrow::compute::concat_batches(&records[0].schema(), &records)?;
- let partition_paths = StringArray::from(
- records[0]
- .column_by_name("_hoodie_partition_path")
- .unwrap()
- .to_data(),
- );
- let actual_partition_paths =
- HashSet::<&str>::from_iter(partition_paths.iter().map(|s|
s.unwrap()));
- let expected_partition_paths =
HashSet::from_iter(vec!["byteField=10/shortField=300"]);
- assert_eq!(actual_partition_paths, expected_partition_paths);
+ let partition_paths = all_records
+ .column_by_name(MetaField::PartitionPath.as_ref())
+ .unwrap()
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .unwrap();
+ let ids = all_records
+ .column_by_name("id")
+ .unwrap()
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap();
+ let names = all_records
+ .column_by_name("name")
+ .unwrap()
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .unwrap();
+ let is_actives = all_records
+ .column_by_name("isActive")
+ .unwrap()
+ .as_any()
+ .downcast_ref::<BooleanArray>()
+ .unwrap();
- let file_names = StringArray::from(
- records[0]
- .column_by_name("_hoodie_file_name")
- .unwrap()
- .to_data(),
+ let mut data: Vec<(&str, i32, &str, bool)> = partition_paths
+ .iter()
+ .zip(ids.iter())
+ .zip(names.iter())
+ .zip(is_actives.iter())
+ .map(|(((pt, id), name), is_active)| {
+ (pt.unwrap(), id.unwrap(), name.unwrap(),
is_active.unwrap())
+ })
+ .collect();
+ data.sort_unstable_by_key(|(_, id, _, _)| *id);
+ assert_eq!(
+ data,
+ vec![
+ ("byteField=10/shortField=300", 1, "Alice", false),
+ ("byteField=10/shortField=300", 3, "Carol", true),
+ ]
);
- let actual_file_names =
- HashSet::<&str>::from_iter(file_names.iter().map(|s|
s.unwrap()));
- let expected_file_names: HashSet<&str> = HashSet::from_iter(vec![
-
"a22e8257-e249-45e9-ba46-115bc85adcba-0_0-161-223_20240418173235694.parquet",
- ]);
- assert_eq!(actual_file_names, expected_file_names);
Ok(())
}
@@ -925,12 +1026,12 @@ mod tests {
mod test_incremental_queries {
use super::super::*;
use arrow_array::{Array, StringArray};
- use hudi_tests::TestTable;
+ use hudi_tests::SampleTable;
use std::collections::HashSet;
#[tokio::test]
async fn test_empty() -> Result<()> {
- let base_url = TestTable::V6Empty.url();
+ let base_url = SampleTable::V6Empty.url_to_cow();
let hudi_table = Table::new(base_url.path()).await?;
let records = hudi_table.read_incremental_records("0",
None).await?;
@@ -941,7 +1042,7 @@ mod tests {
#[tokio::test]
async fn test_simplekeygen_nonhivestyle_overwritetable() -> Result<()>
{
- let base_url =
TestTable::V6SimplekeygenNonhivestyleOverwritetable.url();
+ let base_url =
SampleTable::V6SimplekeygenNonhivestyleOverwritetable.url_to_cow();
let hudi_table = Table::new(base_url.path()).await?;
// read records changed from the first commit (exclusive) to the
second commit (inclusive)
diff --git a/crates/core/src/table/partition.rs
b/crates/core/src/table/partition.rs
index 0386ad2..be541c2 100644
--- a/crates/core/src/table/partition.rs
+++ b/crates/core/src/table/partition.rs
@@ -28,6 +28,8 @@ use arrow_schema::Schema;
use std::collections::HashMap;
use std::sync::Arc;
+pub const PARTITION_METAFIELD_PREFIX: &str = ".hoodie_partition_metadata";
+
/// A partition pruner that filters partitions based on the partition path and
its filters.
#[derive(Debug, Clone)]
pub struct PartitionPruner {
diff --git a/crates/core/src/timeline/instant.rs
b/crates/core/src/timeline/instant.rs
index 96bf569..e715b6a 100644
--- a/crates/core/src/timeline/instant.rs
+++ b/crates/core/src/timeline/instant.rs
@@ -28,6 +28,7 @@ use std::str::FromStr;
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum Action {
Commit,
+ DeltaCommit,
ReplaceCommit,
}
@@ -37,6 +38,7 @@ impl FromStr for Action {
fn from_str(s: &str) -> Result<Self> {
match s {
"commit" => Ok(Action::Commit),
+ "deltacommit" => Ok(Action::DeltaCommit),
"replacecommit" => Ok(Action::ReplaceCommit),
_ => Err(CoreError::Timeline(format!("Invalid action: {}", s))),
}
@@ -47,6 +49,7 @@ impl Action {
pub fn as_str(&self) -> &str {
match self {
Action::Commit => "commit",
+ Action::DeltaCommit => "deltacommit",
Action::ReplaceCommit => "replacecommit",
}
}
@@ -115,11 +118,11 @@ impl Ord for Instant {
}
}
-impl TryFrom<&str> for Instant {
- type Error = CoreError;
+impl FromStr for Instant {
+ type Err = CoreError;
/// Parse a timeline file name into an [Instant]. Timezone is assumed to
be UTC.
- fn try_from(file_name: &str) -> Result<Self> {
+ fn from_str(file_name: &str) -> Result<Self, Self::Err> {
Self::try_from_file_name_and_timezone(file_name, "UTC")
}
}
@@ -280,13 +283,13 @@ mod tests {
#[test]
fn test_instant_from_file_name() -> Result<()> {
// Test completed commit
- let instant = Instant::try_from("20240103153000.commit")?;
+ let instant = Instant::from_str("20240103153000.commit")?;
assert_eq!(instant.timestamp, "20240103153000");
assert_eq!(instant.action, Action::Commit);
assert_eq!(instant.state, State::Completed);
// Test inflight replacecommit with milliseconds
- let instant =
Instant::try_from("20240103153000123.replacecommit.inflight")?;
+ let instant =
Instant::from_str("20240103153000123.replacecommit.inflight")?;
assert_eq!(instant.timestamp, "20240103153000123");
assert_eq!(instant.action, Action::ReplaceCommit);
assert_eq!(instant.state, State::Inflight);
@@ -297,16 +300,16 @@ mod tests {
#[test]
fn test_invalid_file_names() {
// Invalid timestamp format
- assert!(Instant::try_from("2024010315.commit").is_err());
+ assert!(Instant::from_str("2024010315.commit").is_err());
// Invalid action
- assert!(Instant::try_from("20240103153000.invalid").is_err());
+ assert!(Instant::from_str("20240103153000.invalid").is_err());
// Invalid state
- assert!(Instant::try_from("20240103153000.commit.invalid").is_err());
+ assert!(Instant::from_str("20240103153000.commit.invalid").is_err());
// No dot separator
- assert!(Instant::try_from("20240103153000commit").is_err());
+ assert!(Instant::from_str("20240103153000commit").is_err());
}
#[test]
@@ -319,7 +322,7 @@ mod tests {
];
for original_name in test_cases {
- let instant = Instant::try_from(original_name)?;
+ let instant = Instant::from_str(original_name)?;
assert_eq!(instant.file_name(), original_name);
}
@@ -328,12 +331,12 @@ mod tests {
#[test]
fn test_instant_ordering() -> Result<()> {
- let instant1 = Instant::try_from("20240103153000.commit")?;
- let instant2 = Instant::try_from("20240103153000001.commit")?;
- let instant3 =
Instant::try_from("20240103153000999.commit.requested")?;
- let instant4 = Instant::try_from("20240103153000999.inflight")?;
- let instant5 = Instant::try_from("20240103153000999.commit")?;
- let instant6 = Instant::try_from("20240103153001.commit")?;
+ let instant1 = Instant::from_str("20240103153000.commit")?;
+ let instant2 = Instant::from_str("20240103153000001.commit")?;
+ let instant3 =
Instant::from_str("20240103153000999.commit.requested")?;
+ let instant4 = Instant::from_str("20240103153000999.inflight")?;
+ let instant5 = Instant::from_str("20240103153000999.commit")?;
+ let instant6 = Instant::from_str("20240103153001.commit")?;
assert!(instant1 < instant2);
assert!(instant2 < instant3);
@@ -346,7 +349,7 @@ mod tests {
#[test]
fn test_relative_path() {
- let instant = Instant::try_from("20240103153000.commit").unwrap();
+ let instant = Instant::from_str("20240103153000.commit").unwrap();
assert_eq!(
instant.relative_path().unwrap(),
".hoodie/20240103153000.commit"
diff --git a/crates/core/src/timeline/mod.rs b/crates/core/src/timeline/mod.rs
index 463c754..e1fc486 100644
--- a/crates/core/src/timeline/mod.rs
+++ b/crates/core/src/timeline/mod.rs
@@ -29,10 +29,11 @@ use crate::Result;
use arrow_schema::Schema;
use instant::Instant;
use log::debug;
-use parquet::arrow::parquet_to_arrow_schema;
use serde_json::{Map, Value};
use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
+use std::path::PathBuf;
+use std::str::FromStr;
use std::sync::Arc;
/// A [Timeline] contains transaction logs of all actions performed on the
table at different [Instant]s of time.
@@ -128,25 +129,55 @@ impl Timeline {
pub async fn get_latest_schema(&self) -> Result<Schema> {
let commit_metadata = self.get_latest_commit_metadata().await?;
- let parquet_path = commit_metadata
+ let first_partition = commit_metadata
.get("partitionToWriteStats")
- .and_then(|v| v.as_object())
+ .and_then(|v| v.as_object());
+
+ let partition_path = first_partition
+ .and_then(|obj| obj.keys().next())
+ .ok_or_else(|| {
+ CoreError::CommitMetadata(
+ "Failed to resolve the latest schema: no partition path
found".to_string(),
+ )
+ })?;
+
+ let first_value = first_partition
.and_then(|obj| obj.values().next())
.and_then(|value| value.as_array())
- .and_then(|arr| arr.first())
- .and_then(|first_value| first_value["path"].as_str());
-
- if let Some(path) = parquet_path {
- let parquet_meta =
self.storage.get_parquet_file_metadata(path).await?;
-
- Ok(parquet_to_arrow_schema(
- parquet_meta.file_metadata().schema_descr(),
- None,
- )?)
- } else {
- Err(CoreError::Timeline(
+ .and_then(|arr| arr.first());
+
+ let parquet_path = first_value.and_then(|v| v["path"].as_str());
+ match parquet_path {
+ Some(path) if path.ends_with(".parquet") => {
+ Ok(self.storage.get_parquet_file_schema(path).await?)
+ }
+ Some(_) => {
+ // TODO: properly handle deltacommit
+ let base_file = first_value
+ .and_then(|v| v["baseFile"].as_str())
+ .ok_or_else(|| {
+ CoreError::CommitMetadata(
+ "Failed to resolve the latest schema: no file path
found".to_string(),
+ )
+ })?;
+ let parquet_file_path_buf = PathBuf::from_str(partition_path)
+ .map_err(|e| {
+ CoreError::CommitMetadata(format!(
+ "Failed to resolve the latest schema: {}",
+ e
+ ))
+ })?
+ .join(base_file);
+ let path = parquet_file_path_buf.to_str().ok_or_else(|| {
+ CoreError::CommitMetadata(
+ "Failed to resolve the latest schema: invalid file
path".to_string(),
+ )
+ })?;
+ Ok(self.storage.get_parquet_file_schema(path).await?)
+ }
+ None => Err(CoreError::CommitMetadata(
"Failed to resolve the latest schema: no file path
found".to_string(),
- ))
+ )),
}
}
@@ -200,11 +231,12 @@ mod tests {
use std::collections::HashMap;
use std::fs::canonicalize;
use std::path::Path;
+ use std::str::FromStr;
use std::sync::Arc;
use url::Url;
- use hudi_tests::TestTable;
+ use hudi_tests::SampleTable;
use crate::config::table::HudiTableConfig;
@@ -219,7 +251,7 @@ mod tests {
#[tokio::test]
async fn timeline_read_latest_schema() {
- let base_url = TestTable::V6Nonpartitioned.url();
+ let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
let timeline = create_test_timeline(base_url).await;
let table_schema = timeline.get_latest_schema().await.unwrap();
assert_eq!(table_schema.fields.len(), 21)
@@ -227,14 +259,15 @@ mod tests {
#[tokio::test]
async fn timeline_read_latest_schema_from_empty_table() {
- let base_url = TestTable::V6Empty.url();
+ let base_url = SampleTable::V6Empty.url_to_cow();
let timeline = create_test_timeline(base_url).await;
let table_schema = timeline.get_latest_schema().await;
assert!(table_schema.is_err());
- assert_eq!(
- table_schema.err().unwrap().to_string(),
- "Timeline error: Failed to resolve the latest schema: no file path
found"
- )
+ assert!(table_schema
+ .err()
+ .unwrap()
+ .to_string()
+ .starts_with("Commit metadata error: Failed to resolve the latest
schema:"))
}
#[tokio::test]
@@ -247,8 +280,8 @@ mod tests {
assert_eq!(
timeline.completed_commits,
vec![
- Instant::try_from("20240402123035233.commit").unwrap(),
- Instant::try_from("20240402144910683.commit").unwrap(),
+ Instant::from_str("20240402123035233.commit").unwrap(),
+ Instant::from_str("20240402144910683.commit").unwrap(),
]
)
}
@@ -263,7 +296,7 @@ mod tests {
)
.unwrap();
let timeline = create_test_timeline(base_url).await;
- let instant = Instant::try_from("20240402123035233.commit").unwrap();
+ let instant = Instant::from_str("20240402123035233.commit").unwrap();
// Test error when reading empty commit metadata file
let result = timeline.get_commit_metadata(&instant).await;
@@ -272,7 +305,7 @@ mod tests {
assert!(matches!(err, CoreError::Timeline(_)));
assert!(err.to_string().contains("Failed to get commit metadata"));
- let instant = Instant::try_from("20240402144910683.commit").unwrap();
+ let instant = Instant::from_str("20240402144910683.commit").unwrap();
// Test error when reading a commit metadata file with invalid JSON
let result = timeline.get_commit_metadata(&instant).await;
diff --git a/crates/core/src/timeline/selector.rs
b/crates/core/src/timeline/selector.rs
index a50d48e..e49e374 100644
--- a/crates/core/src/timeline/selector.rs
+++ b/crates/core/src/timeline/selector.rs
@@ -65,7 +65,7 @@ impl TimelineSelector {
start_datetime,
end_datetime,
states: vec![State::Completed],
- actions: vec![Action::Commit, Action::ReplaceCommit],
+ actions: vec![Action::Commit, Action::DeltaCommit,
Action::ReplaceCommit],
include_archived: false,
})
}
@@ -188,6 +188,7 @@ mod tests {
use crate::config::HudiConfigs;
use hudi_tests::assert_not;
use std::collections::HashMap;
+ use std::str::FromStr;
use std::sync::Arc;
fn create_test_selector(
@@ -257,12 +258,12 @@ mod tests {
async fn create_test_timeline() -> Timeline {
let instants = vec![
- Instant::try_from("20240103153000.commit").unwrap(),
- Instant::try_from("20240103153010999.commit").unwrap(),
- Instant::try_from("20240103153020999.commit.requested").unwrap(),
- Instant::try_from("20240103153020999.inflight").unwrap(),
- Instant::try_from("20240103153020999.commit").unwrap(),
- Instant::try_from("20240103153030999.commit").unwrap(),
+ Instant::from_str("20240103153000.commit").unwrap(),
+ Instant::from_str("20240103153010999.commit").unwrap(),
+ Instant::from_str("20240103153020999.commit.requested").unwrap(),
+ Instant::from_str("20240103153020999.inflight").unwrap(),
+ Instant::from_str("20240103153020999.commit").unwrap(),
+ Instant::from_str("20240103153030999.commit").unwrap(),
];
Timeline::new_from_completed_commits(
Arc::new(HudiConfigs::new([(
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index 0c27e74..2c14e56 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -346,12 +346,12 @@ mod tests {
use datafusion::logical_expr::BinaryExpr;
use hudi_core::config::read::HudiReadConfig::InputPartitions;
- use hudi_tests::TestTable::{
+ use hudi_tests::SampleTable::{
V6ComplexkeygenHivestyle, V6Empty, V6Nonpartitioned,
V6SimplekeygenHivestyleNoMetafields,
V6SimplekeygenNonhivestyle, V6SimplekeygenNonhivestyleOverwritetable,
V6TimebasedkeygenNonhivestyle,
};
- use hudi_tests::{utils, TestTable};
+ use hudi_tests::{utils, SampleTable};
use utils::{get_bool_column, get_i32_column, get_str_column};
use crate::HudiDataSource;
@@ -369,7 +369,7 @@ mod tests {
#[tokio::test]
async fn test_get_empty_schema_from_empty_table() {
let table_provider =
- HudiDataSource::new_with_options(V6Empty.path().as_str(),
empty_options())
+ HudiDataSource::new_with_options(V6Empty.path_to_cow().as_str(),
empty_options())
.await
.unwrap();
let schema = table_provider.schema();
@@ -377,7 +377,7 @@ mod tests {
}
async fn register_test_table_with_session<I, K, V>(
- test_table: &TestTable,
+ test_table: &SampleTable,
options: I,
use_sql: bool,
) -> Result<SessionContext, DataFusionError>
@@ -391,12 +391,12 @@ mod tests {
let create_table_sql = format!(
"CREATE EXTERNAL TABLE {} STORED AS HUDI LOCATION '{}' {}",
test_table.as_ref(),
- test_table.path(),
+ test_table.path_to_cow(),
concat_as_sql_options(options)
);
ctx.sql(create_table_sql.as_str()).await?;
} else {
- let base_url = test_table.url();
+ let base_url = test_table.url_to_cow();
let hudi = HudiDataSource::new_with_options(base_url.as_str(),
options).await?;
ctx.register_table(test_table.as_ref(), Arc::new(hudi))?;
}
@@ -445,7 +445,7 @@ mod tests {
"CREATE EXTERNAL TABLE {} STORED AS {} LOCATION '{}'",
test_table.as_ref(),
invalid_format,
- test_table.path()
+ test_table.path_to_cow()
);
let ctx = create_test_session().await;
@@ -556,10 +556,12 @@ mod tests {
#[tokio::test]
async fn test_supports_filters_pushdown() {
- let table_provider =
- HudiDataSource::new_with_options(V6Nonpartitioned.path().as_str(),
empty_options())
- .await
- .unwrap();
+ let table_provider = HudiDataSource::new_with_options(
+ V6Nonpartitioned.path_to_cow().as_str(),
+ empty_options(),
+ )
+ .await
+ .unwrap();
let expr0 = Expr::BinaryExpr(BinaryExpr {
left:
Box::new(Expr::Column(Column::from_name("name".to_string()))),
diff --git a/crates/tests/data/tables/v6_complexkeygen_hivestyle.sql
b/crates/tests/data/tables/cow/v6_complexkeygen_hivestyle.sql
similarity index 100%
copy from crates/tests/data/tables/v6_complexkeygen_hivestyle.sql
copy to crates/tests/data/tables/cow/v6_complexkeygen_hivestyle.sql
diff --git a/crates/tests/data/tables/v6_complexkeygen_hivestyle.zip
b/crates/tests/data/tables/cow/v6_complexkeygen_hivestyle.zip
similarity index 100%
rename from crates/tests/data/tables/v6_complexkeygen_hivestyle.zip
rename to crates/tests/data/tables/cow/v6_complexkeygen_hivestyle.zip
diff --git a/crates/tests/data/tables/v6_empty.sql
b/crates/tests/data/tables/cow/v6_empty.sql
similarity index 100%
copy from crates/tests/data/tables/v6_empty.sql
copy to crates/tests/data/tables/cow/v6_empty.sql
diff --git a/crates/tests/data/tables/v6_empty.zip
b/crates/tests/data/tables/cow/v6_empty.zip
similarity index 100%
rename from crates/tests/data/tables/v6_empty.zip
rename to crates/tests/data/tables/cow/v6_empty.zip
diff --git a/crates/tests/data/tables/v6_nonpartitioned.sql
b/crates/tests/data/tables/cow/v6_nonpartitioned.sql
similarity index 100%
copy from crates/tests/data/tables/v6_nonpartitioned.sql
copy to crates/tests/data/tables/cow/v6_nonpartitioned.sql
diff --git a/crates/tests/data/tables/v6_nonpartitioned.zip
b/crates/tests/data/tables/cow/v6_nonpartitioned.zip
similarity index 100%
rename from crates/tests/data/tables/v6_nonpartitioned.zip
rename to crates/tests/data/tables/cow/v6_nonpartitioned.zip
diff --git
a/crates/tests/data/tables/v6_simplekeygen_hivestyle_no_metafields.sql
b/crates/tests/data/tables/cow/v6_simplekeygen_hivestyle_no_metafields.sql
similarity index 100%
rename from crates/tests/data/tables/v6_simplekeygen_hivestyle_no_metafields.sql
rename to
crates/tests/data/tables/cow/v6_simplekeygen_hivestyle_no_metafields.sql
diff --git
a/crates/tests/data/tables/v6_simplekeygen_hivestyle_no_metafields.zip
b/crates/tests/data/tables/cow/v6_simplekeygen_hivestyle_no_metafields.zip
similarity index 100%
rename from crates/tests/data/tables/v6_simplekeygen_hivestyle_no_metafields.zip
rename to
crates/tests/data/tables/cow/v6_simplekeygen_hivestyle_no_metafields.zip
diff --git a/crates/tests/data/tables/v6_simplekeygen_nonhivestyle.sql
b/crates/tests/data/tables/cow/v6_simplekeygen_nonhivestyle.sql
similarity index 100%
rename from crates/tests/data/tables/v6_simplekeygen_nonhivestyle.sql
rename to crates/tests/data/tables/cow/v6_simplekeygen_nonhivestyle.sql
diff --git a/crates/tests/data/tables/v6_simplekeygen_nonhivestyle.zip
b/crates/tests/data/tables/cow/v6_simplekeygen_nonhivestyle.zip
similarity index 100%
rename from crates/tests/data/tables/v6_simplekeygen_nonhivestyle.zip
rename to crates/tests/data/tables/cow/v6_simplekeygen_nonhivestyle.zip
diff --git
a/crates/tests/data/tables/v6_simplekeygen_nonhivestyle_overwritetable.sql
b/crates/tests/data/tables/cow/v6_simplekeygen_nonhivestyle_overwritetable.sql
similarity index 100%
rename from
crates/tests/data/tables/v6_simplekeygen_nonhivestyle_overwritetable.sql
rename to
crates/tests/data/tables/cow/v6_simplekeygen_nonhivestyle_overwritetable.sql
diff --git
a/crates/tests/data/tables/v6_simplekeygen_nonhivestyle_overwritetable.zip
b/crates/tests/data/tables/cow/v6_simplekeygen_nonhivestyle_overwritetable.zip
similarity index 100%
rename from
crates/tests/data/tables/v6_simplekeygen_nonhivestyle_overwritetable.zip
rename to
crates/tests/data/tables/cow/v6_simplekeygen_nonhivestyle_overwritetable.zip
diff --git a/crates/tests/data/tables/v6_timebasedkeygen_nonhivestyle.sql
b/crates/tests/data/tables/cow/v6_timebasedkeygen_nonhivestyle.sql
similarity index 100%
rename from crates/tests/data/tables/v6_timebasedkeygen_nonhivestyle.sql
rename to crates/tests/data/tables/cow/v6_timebasedkeygen_nonhivestyle.sql
diff --git a/crates/tests/data/tables/v6_timebasedkeygen_nonhivestyle.zip
b/crates/tests/data/tables/cow/v6_timebasedkeygen_nonhivestyle.zip
similarity index 100%
rename from crates/tests/data/tables/v6_timebasedkeygen_nonhivestyle.zip
rename to crates/tests/data/tables/cow/v6_timebasedkeygen_nonhivestyle.zip
diff --git a/crates/tests/data/tables/v6_complexkeygen_hivestyle.sql
b/crates/tests/data/tables/mor/v6_complexkeygen_hivestyle.sql
similarity index 93%
rename from crates/tests/data/tables/v6_complexkeygen_hivestyle.sql
rename to crates/tests/data/tables/mor/v6_complexkeygen_hivestyle.sql
index 77a1fa7..64f2efd 100644
--- a/crates/tests/data/tables/v6_complexkeygen_hivestyle.sql
+++ b/crates/tests/data/tables/mor/v6_complexkeygen_hivestyle.sql
@@ -43,12 +43,17 @@ CREATE TABLE v6_complexkeygen_hivestyle (
shortField SHORT
)
USING HUDI
+ LOCATION '/opt/data/external_tables/v6_complexkeygen_hivestyle'
TBLPROPERTIES (
- type = 'cow',
+ type = 'mor',
primaryKey = 'id,name',
preCombineField = 'longField',
'hoodie.metadata.enable' = 'false',
- 'hoodie.datasource.write.hive_style_partitioning' = 'true'
+ 'hoodie.datasource.write.hive_style_partitioning' = 'true',
+ 'hoodie.table.log.file.format' = 'PARQUET',
+ 'hoodie.logfile.data.block.format' = 'parquet',
+ 'hoodie.datasource.write.record.merger.impls' =
'org.apache.hudi.HoodieSparkRecordMerger',
+ 'hoodie.parquet.small.file.limit' = '0'
)
PARTITIONED BY (byteField, shortField);
diff --git a/crates/tests/data/tables/mor/v6_complexkeygen_hivestyle.zip
b/crates/tests/data/tables/mor/v6_complexkeygen_hivestyle.zip
new file mode 100644
index 0000000..822728f
Binary files /dev/null and
b/crates/tests/data/tables/mor/v6_complexkeygen_hivestyle.zip differ
diff --git a/crates/tests/data/tables/v6_empty.sql
b/crates/tests/data/tables/mor/v6_empty.sql
similarity index 97%
rename from crates/tests/data/tables/v6_empty.sql
rename to crates/tests/data/tables/mor/v6_empty.sql
index 6db4624..7c6a7db 100644
--- a/crates/tests/data/tables/v6_empty.sql
+++ b/crates/tests/data/tables/mor/v6_empty.sql
@@ -24,7 +24,7 @@ create table v6_empty (
)
USING HUDI
TBLPROPERTIES (
- type = 'cow',
+ type = 'mor',
primaryKey = 'id',
'hoodie.metadata.enable' = 'false'
);
diff --git a/crates/tests/data/tables/mor/v6_empty.zip
b/crates/tests/data/tables/mor/v6_empty.zip
new file mode 100644
index 0000000..6a8e90a
Binary files /dev/null and b/crates/tests/data/tables/mor/v6_empty.zip differ
diff --git a/crates/tests/data/tables/v6_nonpartitioned.sql
b/crates/tests/data/tables/mor/v6_nonpartitioned.sql
similarity index 94%
rename from crates/tests/data/tables/v6_nonpartitioned.sql
rename to crates/tests/data/tables/mor/v6_nonpartitioned.sql
index d581dfa..32ff4c8 100644
--- a/crates/tests/data/tables/v6_nonpartitioned.sql
+++ b/crates/tests/data/tables/mor/v6_nonpartitioned.sql
@@ -44,10 +44,14 @@ CREATE TABLE v6_nonpartitioned (
)
USING HUDI
TBLPROPERTIES (
- type = 'cow',
+ type = 'mor',
primaryKey = 'id',
preCombineField = 'longField',
- 'hoodie.metadata.enable' = 'false'
+ 'hoodie.metadata.enable' = 'false',
+ 'hoodie.table.log.file.format' = 'PARQUET',
+ 'hoodie.logfile.data.block.format' = 'parquet',
+ 'hoodie.datasource.write.record.merger.impls' =
'org.apache.hudi.HoodieSparkRecordMerger',
+ 'hoodie.parquet.small.file.limit' = '0'
);
INSERT INTO v6_nonpartitioned VALUES
diff --git a/crates/tests/data/tables/mor/v6_nonpartitioned.zip
b/crates/tests/data/tables/mor/v6_nonpartitioned.zip
new file mode 100644
index 0000000..c4eb63e
Binary files /dev/null and b/crates/tests/data/tables/mor/v6_nonpartitioned.zip
differ
diff --git a/crates/tests/src/lib.rs b/crates/tests/src/lib.rs
index 457ae4e..a7b1e7b 100644
--- a/crates/tests/src/lib.rs
+++ b/crates/tests/src/lib.rs
@@ -36,7 +36,7 @@ pub fn extract_test_table(zip_path: &Path) -> PathBuf {
#[derive(Debug, EnumString, AsRefStr, EnumIter)]
#[strum(serialize_all = "snake_case")]
-pub enum TestTable {
+pub enum SampleTable {
V6ComplexkeygenHivestyle,
V6Empty,
V6Nonpartitioned,
@@ -46,23 +46,35 @@ pub enum TestTable {
V6TimebasedkeygenNonhivestyle,
}
-impl TestTable {
- pub fn zip_path(&self) -> Box<Path> {
+impl SampleTable {
+ fn zip_path(&self, table_type: &str) -> Box<Path> {
let dir = env!("CARGO_MANIFEST_DIR");
let data_path = Path::new(dir)
.join("data/tables")
+ .join(table_type.to_lowercase())
.join(format!("{}.zip", self.as_ref()));
data_path.into_boxed_path()
}
- pub fn path(&self) -> String {
- let zip_path = self.zip_path();
+ pub fn path_to_cow(&self) -> String {
+ let zip_path = self.zip_path("cow");
let path_buf =
extract_test_table(zip_path.as_ref()).join(self.as_ref());
path_buf.to_str().unwrap().to_string()
}
- pub fn url(&self) -> Url {
- let path = self.path();
+ pub fn path_to_mor(&self) -> String {
+ let zip_path = self.zip_path("mor");
+ let path_buf =
extract_test_table(zip_path.as_ref()).join(self.as_ref());
+ path_buf.to_str().unwrap().to_string()
+ }
+
+ pub fn url_to_cow(&self) -> Url {
+ let path = self.path_to_cow();
+ Url::from_file_path(path).unwrap()
+ }
+
+ pub fn url_to_mor(&self) -> Url {
+ let path = self.path_to_mor();
Url::from_file_path(path).unwrap()
}
}
@@ -71,12 +83,12 @@ impl TestTable {
mod tests {
use strum::IntoEnumIterator;
- use crate::TestTable;
+ use crate::SampleTable;
#[test]
- fn test_table_zip_file_should_exist() {
- for t in TestTable::iter() {
- let path = t.zip_path();
+ fn sample_table_zip_file_should_exist() {
+ for t in SampleTable::iter() {
+ let path = t.zip_path("cow");
assert!(path.exists());
assert!(path.is_file());
}
diff --git a/demo/app/python/src/main.py b/demo/app/python/src/main.py
index 8c08bf6..cc87068 100644
--- a/demo/app/python/src/main.py
+++ b/demo/app/python/src/main.py
@@ -18,35 +18,37 @@
from hudi import HudiTableBuilder
import pyarrow as pa
-hudi_table = HudiTableBuilder.from_base_uri(
- "s3://hudi-demo/v6_complexkeygen_hivestyle"
-).build()
-records = hudi_table.read_snapshot()
+for url in [
+ "s3://hudi-demo/cow/v6_complexkeygen_hivestyle",
+ "s3://hudi-demo/mor/v6_complexkeygen_hivestyle",
+]:
+ hudi_table = HudiTableBuilder.from_base_uri(url).build()
+ records = hudi_table.read_snapshot()
-arrow_table = pa.Table.from_batches(records)
-assert arrow_table.schema.names == [
- "_hoodie_commit_time",
- "_hoodie_commit_seqno",
- "_hoodie_record_key",
- "_hoodie_partition_path",
- "_hoodie_file_name",
- "id",
- "name",
- "isActive",
- "intField",
- "longField",
- "floatField",
- "doubleField",
- "decimalField",
- "dateField",
- "timestampField",
- "binaryField",
- "arrayField",
- "mapField",
- "structField",
- "byteField",
- "shortField",
-]
-assert arrow_table.num_rows == 4
+ arrow_table = pa.Table.from_batches(records)
+ assert arrow_table.schema.names == [
+ "_hoodie_commit_time",
+ "_hoodie_commit_seqno",
+ "_hoodie_record_key",
+ "_hoodie_partition_path",
+ "_hoodie_file_name",
+ "id",
+ "name",
+ "isActive",
+ "intField",
+ "longField",
+ "floatField",
+ "doubleField",
+ "decimalField",
+ "dateField",
+ "timestampField",
+ "binaryField",
+ "arrayField",
+ "mapField",
+ "structField",
+ "byteField",
+ "shortField",
+ ]
+ assert arrow_table.num_rows == 4
print("Python API: read snapshot successfully!")
diff --git a/demo/app/rust/src/main.rs b/demo/app/rust/src/main.rs
index 9b5ea69..1bfbfd3 100644
--- a/demo/app/rust/src/main.rs
+++ b/demo/app/rust/src/main.rs
@@ -26,9 +26,9 @@ use hudi::HudiDataSource;
#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();
- let hudi =
HudiDataSource::new("s3://hudi-demo/v6_complexkeygen_hivestyle").await?;
- ctx.register_table("v6_table", Arc::new(hudi))?;
- let df: DataFrame = ctx.sql("SELECT * from v6_table").await?;
+ let hudi =
HudiDataSource::new("s3://hudi-demo/cow/v6_complexkeygen_hivestyle").await?;
+ ctx.register_table("cow_v6_table", Arc::new(hudi))?;
+ let df: DataFrame = ctx.sql("SELECT * from cow_v6_table").await?;
assert!(
df.schema()
.columns()
diff --git a/demo/infra/mc/prepare_data.sh b/demo/infra/mc/prepare_data.sh
index 2158b98..c432590 100755
--- a/demo/infra/mc/prepare_data.sh
+++ b/demo/infra/mc/prepare_data.sh
@@ -24,8 +24,11 @@ mc alias set local http://minio:9000 "$MINIO_ROOT_USER"
"$MINIO_ROOT_PASSWORD"
mc mb local/hudi-demo
# unzip the data
-mkdir /tmp/tables
-for zip in /opt/data/tables/*.zip; do unzip -o "$zip" -d "/tmp/tables/"; done
+mkdir -p /tmp/tables/cow/
+for zip in /opt/data/tables/cow/*.zip; do unzip -o "$zip" -d
"/tmp/tables/cow/"; done
+mkdir -p /tmp/tables/mor/
+for zip in /opt/data/tables/mor/*.zip; do unzip -o "$zip" -d
"/tmp/tables/mor/"; done
# copy the data to the bucket
-mc cp -r /tmp/tables/* local/hudi-demo/
+mc cp -r /tmp/tables/cow/* local/hudi-demo/cow/
+mc cp -r /tmp/tables/mor/* local/hudi-demo/mor/
diff --git a/python/src/internal.rs b/python/src/internal.rs
index e11b21a..b2a667b 100644
--- a/python/src/internal.rs
+++ b/python/src/internal.rs
@@ -27,8 +27,8 @@ use tokio::runtime::Runtime;
use hudi::error::CoreError;
use hudi::expr::filter::Filter;
+use hudi::file_group::file_slice::FileSlice;
use hudi::file_group::reader::FileGroupReader;
-use hudi::file_group::FileSlice;
use hudi::storage::error::StorageError;
use hudi::table::builder::TableBuilder;
use hudi::table::Table;
@@ -128,7 +128,7 @@ fn convert_file_slice(f: &FileSlice) -> HudiFileSlice {
let file_id = f.file_id().to_string();
let partition_path = f.partition_path().to_string();
let creation_instant_time = f.creation_instant_time().to_string();
- let base_file_name = f.base_file.file_name.clone();
+ let base_file_name = f.base_file.file_name();
let file_metadata = f.base_file.file_metadata.clone().unwrap_or_default();
let base_file_size = file_metadata.size;
let base_file_byte_size = file_metadata.byte_size;