This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 42800d9 feat: implement log file reader for parquet log block (#244)
42800d9 is described below
commit 42800d99cf6361fdfebcd0eb5174e33f3c11170d
Author: Shiyan Xu <[email protected]>
AuthorDate: Tue Jan 14 00:50:15 2025 -0600
feat: implement log file reader for parquet log block (#244)
---
crates/core/src/error.rs | 9 +
crates/core/src/file_group/base_file.rs | 26 +-
crates/core/src/file_group/builder.rs | 16 +-
crates/core/src/file_group/log_file/log_block.rs | 306 +++++++++++++++++++++
crates/core/src/file_group/log_file/log_format.rs | 180 ++++++++++++
crates/core/src/file_group/log_file/mod.rs | 185 +++++++++++++
crates/core/src/file_group/log_file/reader.rs | 278 +++++++++++++++++++
crates/core/src/file_group/mod.rs | 23 +-
crates/core/src/storage/error.rs | 3 +
crates/core/src/storage/mod.rs | 14 +-
crates/core/src/storage/reader.rs | 50 ++++
crates/core/src/table/fs_view.rs | 8 +-
...a144b5ea064-0_20250113230302428.log.1_0-188-387 | Bin 0 -> 5311 bytes
python/hudi/_internal.pyi | 4 +-
python/src/internal.rs | 6 +-
15 files changed, 1063 insertions(+), 45 deletions(-)
diff --git a/crates/core/src/error.rs b/crates/core/src/error.rs
index bf35058..48db16f 100644
--- a/crates/core/src/error.rs
+++ b/crates/core/src/error.rs
@@ -42,12 +42,21 @@ pub enum CoreError {
#[error("{0}")]
ReadFileSliceError(String),
+ #[error("{0}")]
+ LogFormatError(String),
+
+ #[error("{0}")]
+ LogBlockError(String),
+
#[error("{0}")]
InvalidPartitionPath(String),
#[error(transparent)]
ParquetError(#[from] parquet::errors::ParquetError),
+ #[error("{0}")]
+ ReadLogFileError(#[from] std::io::Error),
+
#[error("Storage error: {0}")]
Storage(#[from] StorageError),
diff --git a/crates/core/src/file_group/base_file.rs
b/crates/core/src/file_group/base_file.rs
index 6f26c85..7c4fcfb 100644
--- a/crates/core/src/file_group/base_file.rs
+++ b/crates/core/src/file_group/base_file.rs
@@ -27,7 +27,7 @@ pub struct BaseFile {
pub file_name: String,
/// The id of the enclosing file group.
- pub file_group_id: String,
+ pub file_id: String,
/// The associated instant time of the base file.
pub instant_time: String,
@@ -37,14 +37,14 @@ pub struct BaseFile {
}
impl BaseFile {
- /// Parse file name and extract `file_group_id` and `instant_time`.
+ /// Parse file name and extract `file_id` and `instant_time`.
fn parse_file_name(file_name: &str) -> Result<(String, String)> {
let err_msg = format!("Failed to parse file name '{file_name}' for
base file.");
let (name, _) = file_name
.rsplit_once('.')
.ok_or_else(|| CoreError::FileGroup(err_msg.clone()))?;
let parts: Vec<&str> = name.split('_').collect();
- let file_group_id = parts
+ let file_id = parts
.first()
.ok_or_else(|| CoreError::FileGroup(err_msg.clone()))?
.to_string();
@@ -52,7 +52,7 @@ impl BaseFile {
.get(2)
.ok_or_else(|| CoreError::FileGroup(err_msg.clone()))?
.to_string();
- Ok((file_group_id, instant_time))
+ Ok((file_id, instant_time))
}
}
@@ -60,10 +60,10 @@ impl TryFrom<&str> for BaseFile {
type Error = CoreError;
fn try_from(file_name: &str) -> Result<Self> {
- let (file_group_id, instant_time) = Self::parse_file_name(file_name)?;
+ let (file_id, instant_time) = Self::parse_file_name(file_name)?;
Ok(Self {
file_name: file_name.to_string(),
- file_group_id,
+ file_id,
instant_time,
file_metadata: None,
})
@@ -75,10 +75,10 @@ impl TryFrom<FileMetadata> for BaseFile {
fn try_from(metadata: FileMetadata) -> Result<Self> {
let file_name = metadata.name.clone();
- let (file_group_id, instant_time) = Self::parse_file_name(&file_name)?;
+ let (file_id, instant_time) = Self::parse_file_name(&file_name)?;
Ok(Self {
file_name,
- file_group_id,
+ file_id,
instant_time,
file_metadata: Some(metadata),
})
@@ -94,10 +94,7 @@ mod tests {
fn test_create_base_file_from_file_name() {
let file_name =
"5a226868-2934-4f84-a16f-55124630c68d-0_0-7-24_20240402144910683.parquet";
let base_file = BaseFile::try_from(file_name).unwrap();
- assert_eq!(
- base_file.file_group_id,
- "5a226868-2934-4f84-a16f-55124630c68d-0"
- );
+ assert_eq!(base_file.file_id,
"5a226868-2934-4f84-a16f-55124630c68d-0");
assert_eq!(base_file.instant_time, "20240402144910683");
assert!(base_file.file_metadata.is_none());
}
@@ -109,10 +106,7 @@ mod tests {
1024,
);
let base_file = BaseFile::try_from(metadata).unwrap();
- assert_eq!(
- base_file.file_group_id,
- "5a226868-2934-4f84-a16f-55124630c68d-0"
- );
+ assert_eq!(base_file.file_id,
"5a226868-2934-4f84-a16f-55124630c68d-0");
assert_eq!(base_file.instant_time, "20240402144910683");
let file_metadata = base_file.file_metadata.unwrap();
assert_eq!(file_metadata.size, 1024);
diff --git a/crates/core/src/file_group/builder.rs
b/crates/core/src/file_group/builder.rs
index 9f50360..680b55d 100644
--- a/crates/core/src/file_group/builder.rs
+++ b/crates/core/src/file_group/builder.rs
@@ -41,7 +41,7 @@ pub fn build_file_groups(commit_metadata: &Map<String,
Value>) -> Result<HashSet
let partition = (!partition.is_empty()).then(|| partition.to_string());
for stat in write_stats {
- let file_group_id = stat
+ let file_id = stat
.get("fileId")
.and_then(|v| v.as_str())
.ok_or_else(|| CoreError::CommitMetadata("Invalid fileId in
write stats".into()))?;
@@ -57,7 +57,7 @@ pub fn build_file_groups(commit_metadata: &Map<String,
Value>) -> Result<HashSet
.ok_or_else(|| CoreError::CommitMetadata("Invalid file name in
path".into()))?;
let file_group = FileGroup::new_with_base_file_name(
- file_group_id.to_string(),
+ file_id.to_string(),
partition.clone(),
file_name,
)?;
@@ -80,15 +80,15 @@ pub fn build_replaced_file_groups(
let mut file_groups = HashSet::new();
- for (partition, file_group_ids_value) in partition_to_replaced {
- let file_group_ids = file_group_ids_value
+ for (partition, file_ids_value) in partition_to_replaced {
+ let file_ids = file_ids_value
.as_array()
.ok_or_else(|| CoreError::CommitMetadata("Invalid file group ids
array".into()))?;
let partition = (!partition.is_empty()).then(|| partition.to_string());
- for file_group_id in file_group_ids {
- let id = file_group_id
+ for file_id in file_ids {
+ let id = file_id
.as_str()
.ok_or_else(|| CoreError::CommitMetadata("Invalid file group
id string".into()))?;
@@ -300,7 +300,7 @@ mod tests {
}
#[test]
- fn test_invalid_file_group_ids_array() {
+ fn test_invalid_file_ids_array() {
let metadata: Map<String, Value> = json!({
"partitionToReplaceFileIds": {
"20": "not_an_array"
@@ -318,7 +318,7 @@ mod tests {
}
#[test]
- fn test_invalid_file_group_id_type() {
+ fn test_invalid_file_id_type() {
let metadata: Map<String, Value> = json!({
"partitionToReplaceFileIds": {
"20": [123] // number instead of string
diff --git a/crates/core/src/file_group/log_file/log_block.rs
b/crates/core/src/file_group/log_file/log_block.rs
new file mode 100644
index 0000000..387b709
--- /dev/null
+++ b/crates/core/src/file_group/log_file/log_block.rs
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::error::CoreError;
+use crate::file_group::log_file::log_format::LogFormatVersion;
+use crate::Result;
+use arrow_array::RecordBatch;
+use bytes::Bytes;
+use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
+use std::collections::HashMap;
+use std::str::FromStr;
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+#[allow(dead_code)]
+pub enum BlockType {
+ Command = 0,
+ Delete = 1,
+ Corrupted = 2,
+ AvroData = 3,
+ HfileData = 4,
+ ParquetData = 5,
+ CdcData = 6,
+}
+
+impl AsRef<str> for BlockType {
+ fn as_ref(&self) -> &str {
+ match self {
+ BlockType::Command => ":command",
+ BlockType::Delete => ":delete",
+ BlockType::Corrupted => ":corrupted",
+ BlockType::AvroData => "avro",
+ BlockType::HfileData => "hfile",
+ BlockType::ParquetData => "parquet",
+ BlockType::CdcData => "cdc",
+ }
+ }
+}
+
+impl FromStr for BlockType {
+ type Err = CoreError;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ match s {
+ ":command" => Ok(BlockType::Command),
+ ":delete" => Ok(BlockType::Delete),
+ ":corrupted" => Ok(BlockType::Corrupted),
+ "avro_data" => Ok(BlockType::AvroData),
+ "hfile" => Ok(BlockType::HfileData),
+ "parquet" => Ok(BlockType::ParquetData),
+ "cdc" => Ok(BlockType::CdcData),
+ _ => Err(CoreError::LogFormatError(format!(
+ "Invalid block type: {s}"
+ ))),
+ }
+ }
+}
+
+impl TryFrom<[u8; 4]> for BlockType {
+ type Error = CoreError;
+
+ fn try_from(value_bytes: [u8; 4]) -> Result<Self, Self::Error> {
+ let value = u32::from_be_bytes(value_bytes);
+ match value {
+ 0 => Ok(BlockType::Command),
+ 1 => Ok(BlockType::Delete),
+ 2 => Ok(BlockType::Corrupted),
+ 3 => Ok(BlockType::AvroData),
+ 4 => Ok(BlockType::HfileData),
+ 5 => Ok(BlockType::ParquetData),
+ 6 => Ok(BlockType::CdcData),
+ _ => Err(CoreError::LogFormatError(format!(
+ "Invalid block type: {value}"
+ ))),
+ }
+ }
+}
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum BlockMetadataType {
+ Header,
+ Footer,
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+#[repr(u32)]
+pub enum BlockMetadataKey {
+ InstantTime = 0,
+ TargetInstantTime = 1,
+ Schema = 2,
+ CommandBlockType = 3,
+ CompactedBlockTimes = 4,
+ RecordPositions = 5,
+ BlockIdentifier = 6,
+}
+
+impl TryFrom<[u8; 4]> for BlockMetadataKey {
+ type Error = CoreError;
+
+ fn try_from(value_bytes: [u8; 4]) -> Result<Self, Self::Error> {
+ let value = u32::from_be_bytes(value_bytes);
+ match value {
+ 0 => Ok(Self::InstantTime),
+ 1 => Ok(Self::TargetInstantTime),
+ 2 => Ok(Self::Schema),
+ 3 => Ok(Self::CommandBlockType),
+ 4 => Ok(Self::CompactedBlockTimes),
+ 5 => Ok(Self::RecordPositions),
+ 6 => Ok(Self::BlockIdentifier),
+ _ => Err(CoreError::LogFormatError(format!(
+ "Invalid header key: {value}"
+ ))),
+ }
+ }
+}
+
+#[derive(Debug, Clone)]
+pub struct LogBlock {
+ pub format_version: LogFormatVersion,
+ pub block_type: BlockType,
+ pub header: HashMap<BlockMetadataKey, String>,
+ pub record_batches: Vec<RecordBatch>,
+ pub footer: HashMap<BlockMetadataKey, String>,
+}
+
+impl LogBlock {
+ pub fn decode_content(block_type: &BlockType, content: Vec<u8>) ->
Result<Vec<RecordBatch>> {
+ match block_type {
+ BlockType::ParquetData => {
+ let record_bytes = Bytes::from(content);
+ let parquet_reader =
ParquetRecordBatchReader::try_new(record_bytes, 1024)?;
+ let mut batches = Vec::new();
+ for item in parquet_reader {
+ let batch = item.map_err(CoreError::ArrowError)?;
+ batches.push(batch);
+ }
+ Ok(batches)
+ }
+ _ => Err(CoreError::LogBlockError(format!(
+ "Unsupported block type: {block_type:?}"
+ ))),
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use arrow_array::{ArrayRef, Int64Array, StringArray};
+ use arrow_schema::{DataType, Field, Schema};
+ use parquet::arrow::ArrowWriter;
+ use std::sync::Arc;
+
+ #[test]
+ fn test_block_type_as_ref() {
+ assert_eq!(BlockType::Command.as_ref(), ":command");
+ assert_eq!(BlockType::Delete.as_ref(), ":delete");
+ assert_eq!(BlockType::Corrupted.as_ref(), ":corrupted");
+ assert_eq!(BlockType::AvroData.as_ref(), "avro");
+ assert_eq!(BlockType::HfileData.as_ref(), "hfile");
+ assert_eq!(BlockType::ParquetData.as_ref(), "parquet");
+ assert_eq!(BlockType::CdcData.as_ref(), "cdc");
+ }
+
+ #[test]
+ fn test_block_type_from_str() {
+ assert_eq!(BlockType::from_str(":command").unwrap(),
BlockType::Command);
+ assert_eq!(BlockType::from_str(":delete").unwrap(), BlockType::Delete);
+ assert_eq!(
+ BlockType::from_str(":corrupted").unwrap(),
+ BlockType::Corrupted
+ );
+ assert_eq!(
+ BlockType::from_str("avro_data").unwrap(),
+ BlockType::AvroData
+ );
+ assert_eq!(BlockType::from_str("hfile").unwrap(),
BlockType::HfileData);
+ assert_eq!(
+ BlockType::from_str("parquet").unwrap(),
+ BlockType::ParquetData
+ );
+ assert_eq!(BlockType::from_str("cdc").unwrap(), BlockType::CdcData);
+
+ // Test invalid block type
+ assert!(BlockType::from_str("invalid").is_err());
+ }
+
+ #[test]
+ fn test_block_type_try_from_bytes() {
+ assert_eq!(
+ BlockType::try_from([0, 0, 0, 0]).unwrap(),
+ BlockType::Command
+ );
+ assert_eq!(
+ BlockType::try_from([0, 0, 0, 1]).unwrap(),
+ BlockType::Delete
+ );
+ assert_eq!(
+ BlockType::try_from([0, 0, 0, 2]).unwrap(),
+ BlockType::Corrupted
+ );
+ assert_eq!(
+ BlockType::try_from([0, 0, 0, 3]).unwrap(),
+ BlockType::AvroData
+ );
+ assert_eq!(
+ BlockType::try_from([0, 0, 0, 4]).unwrap(),
+ BlockType::HfileData
+ );
+ assert_eq!(
+ BlockType::try_from([0, 0, 0, 5]).unwrap(),
+ BlockType::ParquetData
+ );
+ assert_eq!(
+ BlockType::try_from([0, 0, 0, 6]).unwrap(),
+ BlockType::CdcData
+ );
+
+ // Test invalid block type
+ assert!(BlockType::try_from([0, 0, 0, 7]).is_err());
+ }
+
+ #[test]
+ fn test_block_metadata_key_try_from_bytes() {
+ assert_eq!(
+ BlockMetadataKey::try_from([0, 0, 0, 0]).unwrap(),
+ BlockMetadataKey::InstantTime
+ );
+ assert_eq!(
+ BlockMetadataKey::try_from([0, 0, 0, 1]).unwrap(),
+ BlockMetadataKey::TargetInstantTime
+ );
+ assert_eq!(
+ BlockMetadataKey::try_from([0, 0, 0, 2]).unwrap(),
+ BlockMetadataKey::Schema
+ );
+ assert_eq!(
+ BlockMetadataKey::try_from([0, 0, 0, 3]).unwrap(),
+ BlockMetadataKey::CommandBlockType
+ );
+ assert_eq!(
+ BlockMetadataKey::try_from([0, 0, 0, 4]).unwrap(),
+ BlockMetadataKey::CompactedBlockTimes
+ );
+ assert_eq!(
+ BlockMetadataKey::try_from([0, 0, 0, 5]).unwrap(),
+ BlockMetadataKey::RecordPositions
+ );
+ assert_eq!(
+ BlockMetadataKey::try_from([0, 0, 0, 6]).unwrap(),
+ BlockMetadataKey::BlockIdentifier
+ );
+
+ // Test invalid metadata key
+ assert!(BlockMetadataKey::try_from([0, 0, 0, 7]).is_err());
+ }
+
+ #[test]
+ fn test_decode_parquet_content() -> Result<()> {
+ // Create sample parquet bytes
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("id", DataType::Int64, false),
+ Field::new("name", DataType::Utf8, false),
+ ]));
+
+ let ids = Int64Array::from(vec![1, 2, 3]);
+ let names = StringArray::from(vec!["a", "b", "c"]);
+
+ let batch = RecordBatch::try_new(
+ schema.clone(),
+ vec![Arc::new(ids) as ArrayRef, Arc::new(names) as ArrayRef],
+ )?;
+
+ let mut buf = Vec::new();
+ {
+ let mut writer = ArrowWriter::try_new(&mut buf, schema, None)?;
+ writer.write(&batch)?;
+ writer.close()?;
+ }
+
+ // Test decoding the parquet content
+ let batches = LogBlock::decode_content(&BlockType::ParquetData, buf)?;
+ assert_eq!(batches.len(), 1);
+ assert_eq!(batches[0].num_rows(), 3);
+
+ // Test decoding with unsupported block type
+ assert!(LogBlock::decode_content(&BlockType::AvroData,
vec![]).is_err());
+
+ Ok(())
+ }
+}
diff --git a/crates/core/src/file_group/log_file/log_format.rs
b/crates/core/src/file_group/log_file/log_format.rs
new file mode 100644
index 0000000..290d668
--- /dev/null
+++ b/crates/core/src/file_group/log_file/log_format.rs
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use crate::error::CoreError;
+use crate::Result;
+
+pub const MAGIC: &[u8] = b"#HUDI#";
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+#[repr(u32)]
+pub enum LogFormatVersion {
+ V0 = 0,
+ V1 = 1,
+ V2 = 2,
+ V3 = 3,
+}
+
+impl TryFrom<[u8; 4]> for LogFormatVersion {
+ type Error = CoreError;
+
+ fn try_from(value_bytes: [u8; 4]) -> Result<Self, Self::Error> {
+ let value = u32::from_be_bytes(value_bytes);
+ Self::try_from(value)
+ }
+}
+
+impl TryFrom<u32> for LogFormatVersion {
+ type Error = CoreError;
+
+ fn try_from(value: u32) -> std::result::Result<Self, Self::Error> {
+ match value {
+ 0 => Ok(Self::V0),
+ 1 => Ok(Self::V1),
+ 2 => Ok(Self::V2),
+ 3 => Ok(Self::V3),
+ _ => Err(CoreError::LogFormatError(format!(
+ "Invalid log format version: {value}"
+ ))),
+ }
+ }
+}
+
+impl LogFormatVersion {
+ #[inline]
+ pub fn has_block_type(&self) -> bool {
+ !matches!(self, LogFormatVersion::V0)
+ }
+
+ #[inline]
+ pub fn has_header(&self) -> bool {
+ !matches!(self, LogFormatVersion::V0)
+ }
+
+ #[inline]
+ pub fn has_content_length(&self) -> bool {
+ !matches!(self, LogFormatVersion::V0)
+ }
+
+ #[inline]
+ pub fn has_footer(&self) -> bool {
+ matches!(self, LogFormatVersion::V1)
+ }
+
+ #[inline]
+ pub fn has_total_log_block_length(&self) -> bool {
+ matches!(self, LogFormatVersion::V1)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_log_format_version_from_bytes() {
+ // Test valid versions
+ assert_eq!(
+ LogFormatVersion::try_from([0, 0, 0, 0]).unwrap(),
+ LogFormatVersion::V0
+ );
+ assert_eq!(
+ LogFormatVersion::try_from([0, 0, 0, 1]).unwrap(),
+ LogFormatVersion::V1
+ );
+ assert_eq!(
+ LogFormatVersion::try_from([0, 0, 0, 2]).unwrap(),
+ LogFormatVersion::V2
+ );
+ assert_eq!(
+ LogFormatVersion::try_from([0, 0, 0, 3]).unwrap(),
+ LogFormatVersion::V3
+ );
+
+ // Test invalid version
+ let err = LogFormatVersion::try_from([0, 0, 0, 4]).unwrap_err();
+ assert!(matches!(err, CoreError::LogFormatError(_)));
+ assert!(err.to_string().contains("Invalid log format version: 4"));
+ }
+
+ #[test]
+ fn test_log_format_version_from_u32() {
+ // Test valid versions
+ assert_eq!(
+ LogFormatVersion::try_from(0u32).unwrap(),
+ LogFormatVersion::V0
+ );
+ assert_eq!(
+ LogFormatVersion::try_from(1u32).unwrap(),
+ LogFormatVersion::V1
+ );
+ assert_eq!(
+ LogFormatVersion::try_from(2u32).unwrap(),
+ LogFormatVersion::V2
+ );
+ assert_eq!(
+ LogFormatVersion::try_from(3u32).unwrap(),
+ LogFormatVersion::V3
+ );
+
+ // Test invalid version
+ let err = LogFormatVersion::try_from(4u32).unwrap_err();
+ assert!(matches!(err, CoreError::LogFormatError(_)));
+ assert!(err.to_string().contains("Invalid log format version: 4"));
+ }
+
+ #[test]
+ fn test_version_feature_flags_v0() {
+ let version = LogFormatVersion::V0;
+ assert!(!version.has_block_type());
+ assert!(!version.has_header());
+ assert!(!version.has_content_length());
+ assert!(!version.has_footer());
+ assert!(!version.has_total_log_block_length());
+ }
+
+ #[test]
+ fn test_version_feature_flags_v1() {
+ let version = LogFormatVersion::V1;
+ assert!(version.has_block_type());
+ assert!(version.has_header());
+ assert!(version.has_content_length());
+ assert!(version.has_footer());
+ assert!(version.has_total_log_block_length());
+ }
+
+ #[test]
+ fn test_version_feature_flags_v2() {
+ let version = LogFormatVersion::V2;
+ assert!(version.has_block_type());
+ assert!(version.has_header());
+ assert!(version.has_content_length());
+ assert!(!version.has_footer());
+ assert!(!version.has_total_log_block_length());
+ }
+
+ #[test]
+ fn test_version_feature_flags_v3() {
+ let version = LogFormatVersion::V3;
+ assert!(version.has_block_type());
+ assert!(version.has_header());
+ assert!(version.has_content_length());
+ assert!(!version.has_footer());
+ assert!(!version.has_total_log_block_length());
+ }
+}
diff --git a/crates/core/src/file_group/log_file/mod.rs
b/crates/core/src/file_group/log_file/mod.rs
new file mode 100644
index 0000000..46dd403
--- /dev/null
+++ b/crates/core/src/file_group/log_file/mod.rs
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use crate::error::CoreError;
+use crate::Result;
+use std::str::FromStr;
+
+mod log_block;
+mod log_format;
+pub mod reader;
+
+#[derive(Clone, Debug, PartialEq, Eq)]
+pub struct LogFile {
+ file_id: String,
+ base_commit_timestamp: String,
+ log_file_extension: String,
+ log_file_version: String,
+ file_write_token: String,
+}
+
+const LOG_FILE_PREFIX: char = '.';
+
+impl FromStr for LogFile {
+ type Err = CoreError;
+
+ /// Parse a log file name into a [LogFile].
+ ///
+ /// File name format:
+ ///
+ /// ```text
+ /// .[File Id]_[Base Commit Timestamp].[Log File Extension].[Log File
Version]_[File Write Token]
+ /// ```
+ fn from_str(file_name: &str) -> Result<Self, Self::Err> {
+ let err_msg = format!("Failed to parse file name '{file_name}' for log
file.");
+
+ if !file_name.starts_with(LOG_FILE_PREFIX) {
+ return Err(CoreError::FileGroup(err_msg));
+ }
+
+ let file_name = &file_name[LOG_FILE_PREFIX.len_utf8()..];
+
+ let (file_id, rest) = file_name
+ .split_once('_')
+ .ok_or_else(|| CoreError::FileGroup(err_msg.clone()))?;
+
+ let (middle, file_write_token) = rest
+ .rsplit_once('_')
+ .ok_or_else(|| CoreError::FileGroup(err_msg.clone()))?;
+
+ let parts: Vec<&str> = middle.split('.').collect();
+ if parts.len() != 3 {
+ return Err(CoreError::FileGroup(err_msg.clone()));
+ }
+
+ let base_commit_timestamp = parts[0];
+ let log_file_extension = parts[1];
+ let log_file_version = parts[2];
+
+ if file_id.is_empty()
+ || base_commit_timestamp.is_empty()
+ || log_file_extension.is_empty()
+ || log_file_version.is_empty()
+ || file_write_token.is_empty()
+ {
+ return Err(CoreError::FileGroup(err_msg.clone()));
+ }
+
+ Ok(LogFile {
+ file_id: file_id.to_string(),
+ base_commit_timestamp: base_commit_timestamp.to_string(),
+ log_file_extension: log_file_extension.to_string(),
+ log_file_version: log_file_version.to_string(),
+ file_write_token: file_write_token.to_string(),
+ })
+ }
+}
+
+impl LogFile {
+ /// Returns the file name of the log file.
+ #[inline]
+ pub fn file_name(&self) -> String {
+ format!(
+
"{prefix}{file_id}_{base_commit_timestamp}.{log_file_extension}.{log_file_version}_{file_write_token}",
+ prefix = LOG_FILE_PREFIX,
+ file_id = self.file_id,
+ base_commit_timestamp = self.base_commit_timestamp,
+ log_file_extension = self.log_file_extension,
+ log_file_version = self.log_file_version,
+ file_write_token = self.file_write_token
+ )
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_valid_filename_parsing() {
+ let filename =
".54e9a5e9-ee5d-4ed2-acee-720b5810d380-0_20250109233025121.log.1_0-51-115";
+ let log_file = LogFile::from_str(filename).unwrap();
+
+ assert_eq!(log_file.file_id, "54e9a5e9-ee5d-4ed2-acee-720b5810d380-0");
+ assert_eq!(log_file.base_commit_timestamp, "20250109233025121");
+ assert_eq!(log_file.log_file_extension, "log");
+ assert_eq!(log_file.log_file_version, "1");
+ assert_eq!(log_file.file_write_token, "0-51-115");
+ }
+
+ #[test]
+ fn test_filename_reconstruction() {
+ let original =
".54e9a5e9-ee5d-4ed2-acee-720b5810d380-0_20250109233025121.log.1_0-51-115";
+ let log_file = LogFile::from_str(original).unwrap();
+ assert_eq!(log_file.file_name(), original);
+ }
+
+ #[test]
+ fn test_missing_dot_prefix() {
+ let filename = "myfile_20250109233025121.log.v1_abc123";
+ assert!(matches!(
+ LogFile::from_str(filename),
+ Err(CoreError::FileGroup(_))
+ ));
+ }
+
+ #[test]
+ fn test_missing_first_underscore() {
+ let filename = ".myfile20250109233025121.log.v1_abc123";
+ assert!(matches!(
+ LogFile::from_str(filename),
+ Err(CoreError::FileGroup(_))
+ ));
+ }
+
+ #[test]
+ fn test_missing_last_underscore() {
+ let filename = ".myfile_20250109233025121.log.v1abc123";
+ assert!(matches!(
+ LogFile::from_str(filename),
+ Err(CoreError::FileGroup(_))
+ ));
+ }
+
+ #[test]
+ fn test_incorrect_dot_parts() {
+ let filename = ".myfile_20250109233025121.log.v1.extra_abc123";
+ assert!(matches!(
+ LogFile::from_str(filename),
+ Err(CoreError::FileGroup(_))
+ ));
+ }
+
+ #[test]
+ fn test_empty_components() {
+ let filenames = vec![
+ "._20250109233025121.log.v1_abc123", // empty file_id
+ ".myfile_.log.v1_abc123", // empty timestamp
+ ".myfile_20250109233025121..v1_abc123", // empty extension
+ ".myfile_20250109233025121.log._abc123", // empty version
+ ".myfile_20250109233025121.log.v1_", // empty token
+ ];
+
+ for filename in filenames {
+ assert!(matches!(
+ LogFile::from_str(filename),
+ Err(CoreError::FileGroup(_))
+ ));
+ }
+ }
+}
diff --git a/crates/core/src/file_group/log_file/reader.rs
b/crates/core/src/file_group/log_file/reader.rs
new file mode 100644
index 0000000..84b848b
--- /dev/null
+++ b/crates/core/src/file_group/log_file/reader.rs
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::error::CoreError;
+use crate::file_group::log_file::log_block::{
+ BlockMetadataKey, BlockMetadataType, BlockType, LogBlock,
+};
+use crate::file_group::log_file::log_format::{LogFormatVersion, MAGIC};
+use crate::storage::reader::StorageReader;
+use crate::storage::Storage;
+use crate::Result;
+use bytes::BytesMut;
+use std::collections::HashMap;
+use std::io::{self, Read, Seek};
+use std::sync::Arc;
+
+pub const DEFAULT_BUFFER_SIZE: usize = 16 * 1024 * 1024;
+
+#[allow(dead_code)]
+#[derive(Debug)]
+pub struct LogFileReader<R> {
+ storage: Arc<Storage>,
+ reader: R,
+ buffer: BytesMut,
+}
+
+impl LogFileReader<StorageReader> {
+ pub async fn new(storage: Arc<Storage>, relative_path: &str) ->
Result<Self> {
+ let reader = storage.get_storage_reader(relative_path).await?;
+ Ok(Self {
+ storage,
+ reader,
+ buffer: BytesMut::with_capacity(DEFAULT_BUFFER_SIZE),
+ })
+ }
+
+ pub fn read_all_blocks(mut self) -> Result<Vec<LogBlock>> {
+ let mut blocks = Vec::new();
+ while let Some(block) = self.read_next_block()? {
+ blocks.push(block);
+ }
+ Ok(blocks)
+ }
+}
+
+impl<R: Read + Seek> LogFileReader<R> {
+ /// Read [`MAGIC`] from the log file.
+ ///
+ /// Returns `Ok(true)` if the magic bytes are read successfully.
+ ///
+ /// Returns `Ok(false)` if the end of the file is reached.
+ ///
+ /// Returns an error if the magic bytes are invalid or an I/O error occurs.
+ fn read_magic(&mut self) -> Result<bool> {
+ let mut magic = [0u8; 6];
+ match self.reader.read_exact(&mut magic) {
+ Ok(_) => {
+ if magic != MAGIC {
+ return Err(CoreError::LogFormatError("Invalid
magic".to_string()));
+ }
+ Ok(true)
+ }
+ Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => Ok(false),
+ Err(e) => Err(CoreError::ReadLogFileError(e)),
+ }
+ }
+
+ /// Read 8 bytes for the log block's length excluding the magic.
+ fn read_block_length(&mut self) -> Result<u64> {
+ let mut size_buf = [0u8; 8];
+ self.reader.read_exact(&mut size_buf)?;
+ Ok(u64::from_be_bytes(size_buf))
+ }
+
+ fn create_corrupted_block_if_needed(
+ &mut self,
+ _curent_pos: u64,
+ _block_length: Option<u64>,
+ ) -> Option<LogBlock> {
+ // TODO: support creating corrupted block
+ None
+ }
+
+ fn read_block_length_or_corrupted_block(
+ &mut self,
+ start_pos: u64,
+ ) -> Result<(u64, Option<LogBlock>)> {
+ match self.read_block_length() {
+ Ok(length) => {
+ if let Some(block) =
self.create_corrupted_block_if_needed(start_pos, Some(length))
+ {
+ Ok((0, Some(block)))
+ } else {
+ Ok((length, None))
+ }
+ }
+ Err(e) => Err(e),
+ }
+ }
+
+ /// Read 4 bytes for [`LogFormatVersion`].
+ fn read_format_version(&mut self) -> Result<LogFormatVersion> {
+ let mut version_buf = [0u8; 4];
+ self.reader.read_exact(&mut version_buf)?;
+ LogFormatVersion::try_from(version_buf)
+ }
+
+ /// Read 4 bytes for [`BlockType`].
+ fn read_block_type(&mut self, format_version: &LogFormatVersion) ->
Result<BlockType> {
+ if !format_version.has_block_type() {
+ return Err(CoreError::LogFormatError(
+ "Block type is not supported".to_string(),
+ ));
+ }
+ let mut type_buf = [0u8; 4];
+ self.reader.read_exact(&mut type_buf)?;
+ BlockType::try_from(type_buf)
+ }
+
+ /// First, read 4 bytes for the number of entries in the metadata section
(header or footer).
+ ///
+ /// Then for each entry,
+ /// 1. Read 4 bytes for the key
+ /// 2. Read 4 bytes for the value's length
+ /// 3. Read the bytes of the length for the value
+ ///
+ /// See also [`BlockMetadataKey`].
+ fn read_block_metadata(
+ &mut self,
+ metadata_type: BlockMetadataType,
+ format_version: &LogFormatVersion,
+ ) -> Result<HashMap<BlockMetadataKey, String>> {
+ match metadata_type {
+ BlockMetadataType::Header if !format_version.has_header() => {
+ return Ok(HashMap::new());
+ }
+ BlockMetadataType::Footer if !format_version.has_footer() => {
+ return Ok(HashMap::new());
+ }
+ _ => {}
+ }
+ let mut num_entries_buf = [0u8; 4];
+ self.reader.read_exact(&mut num_entries_buf)?;
+ let num_entries = u32::from_be_bytes(num_entries_buf);
+ let mut metadata: HashMap<BlockMetadataKey, String> =
+ HashMap::with_capacity(num_entries as usize);
+ for _ in 0..num_entries {
+ let mut key_buf = [0u8; 4];
+ self.reader.read_exact(&mut key_buf)?;
+ let key = BlockMetadataKey::try_from(key_buf)?;
+ let mut value_len_buf = [0u8; 4];
+ self.reader.read_exact(&mut value_len_buf)?;
+ let value_len = u32::from_be_bytes(value_len_buf);
+ let mut value_buf = vec![0u8; value_len as usize];
+ self.reader.read_exact(&mut value_buf)?;
+ let value =
+ String::from_utf8(value_buf).map_err(|e|
CoreError::Utf8Error(e.utf8_error()))?;
+ metadata.insert(key, value);
+ }
+ Ok(metadata)
+ }
+
+ /// Read the content of the log block.
+ fn read_content(
+ &mut self,
+ format_version: &LogFormatVersion,
+ fallback_length: u64,
+ ) -> Result<Vec<u8>> {
+ let content_length = if format_version.has_content_length() {
+ let mut content_length_buf = [0u8; 8];
+ self.reader.read_exact(&mut content_length_buf)?;
+ u64::from_be_bytes(content_length_buf)
+ } else {
+ fallback_length
+ };
+ let mut content_buf = vec![0u8; content_length as usize];
+ self.reader.read_exact(&mut content_buf)?;
+ Ok(content_buf)
+ }
+
+ /// Read 8 bytes for the total length of the log block.
+ fn read_total_block_length(
+ &mut self,
+ format_version: &LogFormatVersion,
+ ) -> Result<Option<u64>> {
+ if !format_version.has_total_log_block_length() {
+ return Ok(None);
+ }
+ let mut size_buf = [0u8; 8];
+ self.reader.read_exact(&mut size_buf)?;
+ Ok(Some(u64::from_be_bytes(size_buf)))
+ }
+
+ fn read_next_block(&mut self) -> Result<Option<LogBlock>> {
+ if !self.read_magic()? {
+ return Ok(None);
+ }
+
+ let curr_pos = self
+ .reader
+ .stream_position()
+ .map_err(CoreError::ReadLogFileError)?;
+
+ let (block_length, _) =
self.read_block_length_or_corrupted_block(curr_pos)?;
+ let format_version = self.read_format_version()?;
+ let block_type = self.read_block_type(&format_version)?;
+ let header = self.read_block_metadata(BlockMetadataType::Header,
&format_version)?;
+ let content = self.read_content(&format_version, block_length)?;
+ let record_batches = LogBlock::decode_content(&block_type, content)?;
+ let footer = self.read_block_metadata(BlockMetadataType::Footer,
&format_version)?;
+ let _ = self.read_total_block_length(&format_version)?;
+
+ Ok(Some(LogBlock {
+ format_version,
+ block_type,
+ header,
+ record_batches,
+ footer,
+ }))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use std::fs::canonicalize;
+ use std::path::PathBuf;
+ use url::Url;
+
+ fn get_sample_log_file() -> (PathBuf, String) {
+ let dir = PathBuf::from("tests/data/log_files/valid_log_parquet");
+ (
+ canonicalize(dir).unwrap(),
+
".ee2ace10-7667-40f5-9848-0a144b5ea064-0_20250113230302428.log.1_0-188-387".to_string(),
+ )
+ }
+
+ #[tokio::test]
+ async fn test_read_sample_log_file() {
+ let (dir, file_name) = get_sample_log_file();
+ let dir_url = Url::from_directory_path(dir).unwrap();
+ let storage = Storage::new_with_base_url(dir_url).unwrap();
+ let reader = LogFileReader::new(storage, &file_name).await.unwrap();
+ let blocks = reader.read_all_blocks().unwrap();
+ assert_eq!(blocks.len(), 1);
+
+ let block = &blocks[0];
+ assert_eq!(block.format_version, LogFormatVersion::V1);
+ assert_eq!(block.block_type, BlockType::ParquetData);
+ assert_eq!(block.header.len(), 2);
+ assert_eq!(
+ block.header.get(&BlockMetadataKey::InstantTime).unwrap(),
+ "20250113230424191"
+ );
+ assert!(block.header.contains_key(&BlockMetadataKey::Schema));
+ assert_eq!(block.footer.len(), 0);
+
+ let batches = block.record_batches.as_slice();
+ assert_eq!(batches.len(), 1);
+ assert_eq!(batches[0].num_rows(), 1);
+ }
+}
diff --git a/crates/core/src/file_group/mod.rs
b/crates/core/src/file_group/mod.rs
index 8c92f4e..fce9786 100644
--- a/crates/core/src/file_group/mod.rs
+++ b/crates/core/src/file_group/mod.rs
@@ -22,6 +22,7 @@
pub mod base_file;
pub mod builder;
+pub mod log_file;
pub mod reader;
use crate::error::CoreError;
@@ -65,8 +66,8 @@ impl FileSlice {
/// Returns the enclosing [FileGroup]'s id.
#[inline]
- pub fn file_group_id(&self) -> &str {
- &self.base_file.file_group_id
+ pub fn file_id(&self) -> &str {
+ &self.base_file.file_id
}
/// Returns the partition path of the [FileSlice].
@@ -101,14 +102,14 @@ impl FileSlice {
/// Hudi File Group.
#[derive(Clone, Debug)]
pub struct FileGroup {
- pub id: String,
+ pub file_id: String,
pub partition_path: Option<String>,
pub file_slices: BTreeMap<String, FileSlice>,
}
impl PartialEq for FileGroup {
fn eq(&self, other: &Self) -> bool {
- self.id == other.id && self.partition_path == other.partition_path
+ self.file_id == other.file_id && self.partition_path ==
other.partition_path
}
}
@@ -116,7 +117,7 @@ impl Eq for FileGroup {}
impl Hash for FileGroup {
fn hash<H: Hasher>(&self, state: &mut H) {
- self.id.hash(state);
+ self.file_id.hash(state);
self.partition_path.hash(state);
}
}
@@ -126,7 +127,7 @@ impl fmt::Display for FileGroup {
f.write_str(
format!(
"File Group: partition {:?} id {}",
- &self.partition_path, &self.id
+ &self.partition_path, &self.file_id
)
.as_str(),
)
@@ -134,9 +135,9 @@ impl fmt::Display for FileGroup {
}
impl FileGroup {
- pub fn new(id: String, partition_path: Option<String>) -> Self {
+ pub fn new(file_id: String, partition_path: Option<String>) -> Self {
Self {
- id,
+ file_id,
partition_path,
file_slices: BTreeMap::new(),
}
@@ -162,7 +163,7 @@ impl FileGroup {
if self.file_slices.contains_key(instant_time) {
Err(CoreError::FileGroup(format!(
"Instant time {instant_time} is already present in File Group
{}",
- self.id
+ self.file_id
)))
} else {
self.file_slices.insert(
@@ -236,7 +237,7 @@ mod tests {
#[test]
fn test_file_group_display() {
let file_group = FileGroup {
- id: "group123".to_string(),
+ file_id: "group123".to_string(),
partition_path: Some("part/2023-01-01".to_string()),
file_slices: BTreeMap::new(),
};
@@ -249,7 +250,7 @@ mod tests {
);
let file_group_no_partition = FileGroup {
- id: "group456".to_string(),
+ file_id: "group456".to_string(),
partition_path: None,
file_slices: BTreeMap::new(),
};
diff --git a/crates/core/src/storage/error.rs b/crates/core/src/storage/error.rs
index 09c749d..a29dd6a 100644
--- a/crates/core/src/storage/error.rs
+++ b/crates/core/src/storage/error.rs
@@ -37,6 +37,9 @@ pub enum StorageError {
#[error(transparent)]
ObjectStorePathError(#[from] object_store::path::Error),
+ #[error(transparent)]
+ ReaderError(#[from] std::io::Error),
+
#[error(transparent)]
ParquetError(#[from] parquet::errors::ParquetError),
diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs
index 4f5aec3..85f83b9 100644
--- a/crates/core/src/storage/mod.rs
+++ b/crates/core/src/storage/mod.rs
@@ -36,13 +36,15 @@ use url::Url;
use crate::config::table::HudiTableConfig;
use crate::config::HudiConfigs;
-use crate::storage::error::Result;
use crate::storage::error::StorageError::{Creation, InvalidPath};
+use crate::storage::error::{Result, StorageError};
use crate::storage::file_metadata::FileMetadata;
+use crate::storage::reader::StorageReader;
use crate::storage::util::join_url_segments;
pub mod error;
pub mod file_metadata;
+pub mod reader;
pub mod util;
#[allow(dead_code)]
@@ -191,6 +193,16 @@ impl Storage {
Ok(concat_batches(&schema, &batches)?)
}
+ pub async fn get_storage_reader(&self, relative_path: &str) ->
Result<StorageReader> {
+ let obj_url = join_url_segments(&self.base_url, &[relative_path])?;
+ let obj_path = ObjPath::from_url_path(obj_url.path())?;
+ let obj_store = self.object_store.clone();
+ let obj_meta = obj_store.head(&obj_path).await?;
+ StorageReader::new(obj_store, obj_meta)
+ .await
+ .map_err(StorageError::ReaderError)
+ }
+
pub async fn list_dirs(&self, subdir: Option<&str>) -> Result<Vec<String>>
{
let dir_paths = self.list_dirs_as_obj_paths(subdir).await?;
let mut dirs = Vec::new();
diff --git a/crates/core/src/storage/reader.rs
b/crates/core/src/storage/reader.rs
new file mode 100644
index 0000000..b012316
--- /dev/null
+++ b/crates/core/src/storage/reader.rs
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use bytes::Bytes;
+use object_store::{ObjectMeta, ObjectStore};
+use std::io::{BufReader, Cursor, Result};
+use std::io::{Read, Seek, SeekFrom};
+use std::sync::Arc;
+
+#[derive(Debug)]
+pub struct StorageReader {
+ reader: BufReader<Cursor<Bytes>>,
+}
+
+impl StorageReader {
+ pub async fn new(object_store: Arc<dyn ObjectStore>, object_meta:
ObjectMeta) -> Result<Self> {
+ let get_result = object_store.get(&object_meta.location).await?;
+ // TODO change to use stream
+ let bytes = get_result.bytes().await?;
+ let reader = BufReader::with_capacity(bytes.len(), Cursor::new(bytes));
+ Ok(Self { reader })
+ }
+}
+
+impl Read for StorageReader {
+ fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
+ self.reader.read(buf)
+ }
+}
+
+impl Seek for StorageReader {
+ fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
+ self.reader.seek(pos)
+ }
+}
diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs
index 4e6aa1d..455e823 100644
--- a/crates/core/src/table/fs_view.rs
+++ b/crates/core/src/table/fs_view.rs
@@ -102,7 +102,7 @@ impl FileSystemView {
let mut fg_id_to_base_files: HashMap<String, Vec<BaseFile>> =
HashMap::new();
for metadata in file_metadata {
let base_file = BaseFile::try_from(metadata)?;
- let fg_id = &base_file.file_group_id;
+ let fg_id = &base_file.file_id;
fg_id_to_base_files
.entry(fg_id.to_owned())
.or_default()
@@ -257,7 +257,7 @@ mod tests {
assert_eq!(file_slices.len(), 1);
let fg_ids = file_slices
.iter()
- .map(|fsl| fsl.file_group_id())
+ .map(|fsl| fsl.file_id())
.collect::<Vec<_>>();
assert_eq!(fg_ids, vec!["a079bdb3-731c-4894-b855-abfcd6921007-0"]);
for fsl in file_slices.iter() {
@@ -286,7 +286,7 @@ mod tests {
assert_eq!(file_slices.len(), 1);
let fg_ids = file_slices
.iter()
- .map(|fsl| fsl.file_group_id())
+ .map(|fsl| fsl.file_id())
.collect::<Vec<_>>();
assert_eq!(fg_ids, vec!["ebcb261d-62d3-4895-90ec-5b3c9622dff4-0"]);
for fsl in file_slices.iter() {
@@ -327,7 +327,7 @@ mod tests {
let fg_ids = file_slices
.iter()
- .map(|fsl| fsl.file_group_id())
+ .map(|fsl| fsl.file_id())
.collect::<Vec<_>>();
assert_eq!(fg_ids, vec!["a22e8257-e249-45e9-ba46-115bc85adcba-0"]);
for fsl in file_slices.iter() {
diff --git
a/crates/core/tests/data/log_files/valid_log_parquet/.ee2ace10-7667-40f5-9848-0a144b5ea064-0_20250113230302428.log.1_0-188-387
b/crates/core/tests/data/log_files/valid_log_parquet/.ee2ace10-7667-40f5-9848-0a144b5ea064-0_20250113230302428.log.1_0-188-387
new file mode 100644
index 0000000..f0f4d14
Binary files /dev/null and
b/crates/core/tests/data/log_files/valid_log_parquet/.ee2ace10-7667-40f5-9848-0a144b5ea064-0_20250113230302428.log.1_0-188-387
differ
diff --git a/python/hudi/_internal.pyi b/python/hudi/_internal.pyi
index d5e046f..eb4b110 100644
--- a/python/hudi/_internal.pyi
+++ b/python/hudi/_internal.pyi
@@ -60,7 +60,7 @@ class HudiFileSlice:
the partition it belongs to, and associated metadata.
Attributes:
- file_group_id (str): The id of the file group this file slice belongs
to.
+ file_id (str): The id of the file group this file slice belongs to.
partition_path (str): The path of the partition containing this file
slice.
creation_instant_time (str): The creation instant time of this file
slice.
base_file_name (str): The name of the base file.
@@ -69,7 +69,7 @@ class HudiFileSlice:
num_records (int): The number of records in the file slice.
"""
- file_group_id: str
+ file_id: str
partition_path: str
creation_instant_time: str
base_file_name: str
diff --git a/python/src/internal.rs b/python/src/internal.rs
index 4c9b7fd..e11b21a 100644
--- a/python/src/internal.rs
+++ b/python/src/internal.rs
@@ -90,7 +90,7 @@ impl HudiFileGroupReader {
#[pyclass]
pub struct HudiFileSlice {
#[pyo3(get)]
- file_group_id: String,
+ file_id: String,
#[pyo3(get)]
partition_path: String,
#[pyo3(get)]
@@ -125,7 +125,7 @@ impl HudiFileSlice {
#[cfg(not(tarpaulin))]
fn convert_file_slice(f: &FileSlice) -> HudiFileSlice {
- let file_group_id = f.file_group_id().to_string();
+ let file_id = f.file_id().to_string();
let partition_path = f.partition_path().to_string();
let creation_instant_time = f.creation_instant_time().to_string();
let base_file_name = f.base_file.file_name.clone();
@@ -134,7 +134,7 @@ fn convert_file_slice(f: &FileSlice) -> HudiFileSlice {
let base_file_byte_size = file_metadata.byte_size;
let num_records = file_metadata.num_records;
HudiFileSlice {
- file_group_id,
+ file_id,
partition_path,
creation_instant_time,
base_file_name,