This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new c16f79a docs: add hudi core API docs with examples (#113)
c16f79a is described below
commit c16f79a742ec7e90276cdf3efaff222910828a90
Author: KnightChess <[email protected]>
AuthorDate: Sun Sep 15 17:39:34 2024 +0800
docs: add hudi core API docs with examples (#113)
---------
Co-authored-by: Shiyan Xu <[email protected]>
---
crates/core/src/config/internal.rs | 16 ++++++
crates/core/src/config/mod.rs | 23 +++++++++
crates/core/src/config/read.rs | 24 +++++++++
crates/core/src/config/table.rs | 58 ++++++++++++++++++++++
crates/core/src/file_group/mod.rs | 15 ++++++
crates/core/src/lib.rs | 31 ++++++++++++
crates/core/src/storage/mod.rs | 10 ++++
crates/core/src/table/mod.rs | 99 ++++++++++++++++++++++++++++++++++++++
8 files changed, 276 insertions(+)
diff --git a/crates/core/src/config/internal.rs
b/crates/core/src/config/internal.rs
index d6ad814..42641f5 100644
--- a/crates/core/src/config/internal.rs
+++ b/crates/core/src/config/internal.rs
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+//! Hudi internal configurations.
use std::collections::HashMap;
use std::str::FromStr;
@@ -25,6 +26,21 @@ use strum_macros::EnumIter;
use crate::config::{ConfigParser, HudiConfigValue};
+/// Configurations for internal use.
+///
+/// **Example**
+///
+/// ```rust
+/// use url::Url;
+/// use hudi_core::config::HudiConfigValue;
+/// use hudi_core::config::internal::HudiInternalConfig::SkipConfigValidation;
+/// use hudi_core::table::Table as HudiTable;
+///
+/// let options = vec![(SkipConfigValidation.as_ref(),
HudiConfigValue::Boolean(true))];
+/// let base_uri = Url::from_file_path("/tmp/hudi_data").unwrap();
+/// HudiTable::new_with_options(base_uri.as_ref(), options);
+/// ```
+///
#[derive(Clone, Debug, PartialEq, Eq, Hash, EnumIter)]
pub enum HudiInternalConfig {
SkipConfigValidation,
diff --git a/crates/core/src/config/mod.rs b/crates/core/src/config/mod.rs
index 3ba5750..2893c19 100644
--- a/crates/core/src/config/mod.rs
+++ b/crates/core/src/config/mod.rs
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+//! Hudi Configurations.
use std::any::type_name;
use std::collections::HashMap;
use std::sync::Arc;
@@ -28,15 +29,20 @@ pub mod table;
pub const HUDI_CONF_DIR: &str = "HUDI_CONF_DIR";
+/// This defines some common APIs for working with configurations in Hudi.
pub trait ConfigParser: AsRef<str> {
+ /// Configuration value type.
type Output;
+ /// Supplies the default value of the configuration.
fn default_value(&self) -> Option<Self::Output>;
+ /// To indicate if the configuration is required or not, this helps in
validation.
fn is_required(&self) -> bool {
false
}
+ /// Validate the configuration by parsing the given [String] value and
check if it is required.
fn validate(&self, configs: &HashMap<String, String>) -> Result<()> {
match self.parse_value(configs) {
Ok(_) => Ok(()),
@@ -51,8 +57,12 @@ pub trait ConfigParser: AsRef<str> {
}
}
+ /// Parse the [String] value to [Self::Output].
fn parse_value(&self, configs: &HashMap<String, String>) ->
Result<Self::Output>;
+ /// Parse the [String] value to [Self::Output], or return the default
value.
+ ///
+ /// Panic if the default value is not defined.
fn parse_value_or_default(&self, configs: &HashMap<String, String>) ->
Self::Output {
self.parse_value(configs).unwrap_or_else(|_| {
self.default_value()
@@ -61,6 +71,7 @@ pub trait ConfigParser: AsRef<str> {
}
}
+/// All possible data types for Hudi Configuration values.
#[derive(Clone, Debug)]
pub enum HudiConfigValue {
Boolean(bool),
@@ -71,6 +82,13 @@ pub enum HudiConfigValue {
}
impl HudiConfigValue {
+ /// Covert [HudiConfigValue] logical type to the representing data type in
Rust.
+ ///
+ /// - [`HudiConfigValue::Boolean`] -> [bool]
+ /// - [`HudiConfigValue::Integer`] -> [isize]
+ /// - [`HudiConfigValue::UInteger`] -> [usize]
+ /// - [`HudiConfigValue::String`] -> [String]
+ /// - [`HudiConfigValue::List`] -> [`Vec<String>`]
pub fn to<T: 'static + std::fmt::Debug + From<HudiConfigValue>>(self) -> T
{
T::from(self)
}
@@ -124,18 +142,21 @@ impl From<HudiConfigValue> for Vec<String> {
}
}
+/// Hudi configuration container.
#[derive(Clone, Debug)]
pub struct HudiConfigs {
pub raw_configs: Arc<HashMap<String, String>>,
}
impl HudiConfigs {
+ /// Create [HudiConfigs] with key-value pairs of [String]s.
pub fn new(raw_configs: HashMap<String, String>) -> Self {
Self {
raw_configs: Arc::new(raw_configs),
}
}
+ /// Create empty [HudiConfigs].
pub fn empty() -> Self {
Self {
raw_configs: Arc::new(HashMap::new()),
@@ -153,6 +174,7 @@ impl HudiConfigs {
parser.parse_value(&self.raw_configs)
}
+ /// Get value or default value. If the config has no default value, this
will panic.
pub fn get_or_default(
&self,
parser: impl ConfigParser<Output = HudiConfigValue>,
@@ -160,6 +182,7 @@ impl HudiConfigs {
parser.parse_value_or_default(&self.raw_configs)
}
+ /// Get value or default value. If the config has no default value, this
will return [None].
pub fn try_get(
&self,
parser: impl ConfigParser<Output = HudiConfigValue>,
diff --git a/crates/core/src/config/read.rs b/crates/core/src/config/read.rs
index 195aa70..807a0cf 100644
--- a/crates/core/src/config/read.rs
+++ b/crates/core/src/config/read.rs
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+//! Hudi read configurations.
use std::collections::HashMap;
use std::str::FromStr;
@@ -24,9 +25,32 @@ use crate::config::{ConfigParser, HudiConfigValue};
use anyhow::{anyhow, Result};
use strum_macros::EnumIter;
+/// Configurations for reading Hudi tables.
+///
+/// **Example**
+///
+/// ```rust
+/// use url::Url;
+/// use hudi_core::config::read::HudiReadConfig::{AsOfTimestamp,
InputPartitions};
+/// use hudi_core::table::Table as HudiTable;
+///
+/// let options = vec![(InputPartitions.as_ref(), "2"),
+/// (AsOfTimestamp.as_ref(), "20240101010100000")];
+/// let base_uri = Url::from_file_path("/tmp/hudi_data").unwrap();
+/// HudiTable::new_with_options(base_uri.as_ref(), options);
+/// ```
+///
#[derive(Clone, Debug, PartialEq, Eq, Hash, EnumIter)]
pub enum HudiReadConfig {
+ /// Define input splits
+ /// - Hoodie Key : hoodie.read.input.partitions
+ ///
+ /// If has 100 files, [InputPartitions] is 5, will product 5 chunk,
+ /// every iter or task process 20 files
InputPartitions,
+
+ /// The query instant for time travel. Without specified this option, we
query the latest snapshot.
+ /// - Hoodie Key : hoodie.read.as.of.timestamp
AsOfTimestamp,
}
diff --git a/crates/core/src/config/table.rs b/crates/core/src/config/table.rs
index a23ecbd..f55df65 100644
--- a/crates/core/src/config/table.rs
+++ b/crates/core/src/config/table.rs
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+//! Hudi table configurations.
use std::collections::HashMap;
use std::str::FromStr;
@@ -26,22 +27,77 @@ use strum_macros::{AsRefStr, EnumIter};
use crate::config::{ConfigParser, HudiConfigValue};
+/// Configurations for Hudi tables, persisted in `hoodie.properties`.
+///
+/// **Example**
+///
+/// ```rust
+/// use url::Url;
+/// use hudi_core::config::table::HudiTableConfig::BaseFileFormat;
+/// use hudi_core::table::Table as HudiTable;
+///
+/// let options = vec![(BaseFileFormat.as_ref(), "parquet")];
+/// let base_uri = Url::from_file_path("/tmp/hudi_data").unwrap();
+/// HudiTable::new_with_options(base_uri.as_ref(), options);
+/// ```
#[derive(Clone, Debug, PartialEq, Eq, Hash, EnumIter)]
pub enum HudiTableConfig {
+ /// Base file format
+ ///
+ /// Currently only parquet is supported.
BaseFileFormat,
+
+ /// Table checksum is used to guard against partial writes in HDFS.
+ /// It is added as the last entry in hoodie.properties and then used to
validate while reading table config.
Checksum,
+
+ /// Database name that will be used for incremental query.
+ /// If different databases have the same table name during incremental
query,
+ /// we can set it to limit the table name under a specific database
DatabaseName,
+
+ /// When set to true, will not write the partition columns into hudi. By
default, false.
DropsPartitionFields,
+
+ /// Flag to indicate whether to use Hive style partitioning.
+ /// If set true, the names of partition folders follow
<partition_column_name>=<partition_value> format.
+ /// By default false (the names of partition folders are only partition
values)
IsHiveStylePartitioning,
+
+ /// Should we url encode the partition path value, before creating the
folder structure.
IsPartitionPathUrlencoded,
+
+ /// Key Generator class property for the hoodie table
KeyGeneratorClass,
+
+ /// Fields used to partition the table. Concatenated values of these
fields are used as
+ /// the partition path, by invoking toString().
+ /// These fields also include the partition type which is used by custom
key generators
PartitionFields,
+
+ /// Field used in preCombining before actual write. By default, when two
records have the same key value,
+ /// the largest value for the precombine field determined by
Object.compareTo(..), is picked.
PrecombineField,
+
+ /// When enabled, populates all meta fields. When disabled, no meta fields
are populated
+ /// and incremental queries will not be functional. This is only meant to
be used for append only/immutable data for batch processing
PopulatesMetaFields,
+
+ /// Columns used to uniquely identify the table.
+ /// Concatenated values of these fields are used as the record key
component of HoodieKey.
RecordKeyFields,
+
+ /// Table name that will be used for registering with Hive. Needs to be
same across runs.
TableName,
+
+ /// The table type for the underlying data, for this write. This can’t
change between writes.
TableType,
+
+ /// Version of table, used for running upgrade/downgrade steps between
releases with potentially
+ /// breaking/backwards compatible changes.
TableVersion,
+
+ /// Version of timeline used, by the table.
TimelineLayoutVersion,
}
@@ -129,6 +185,7 @@ impl ConfigParser for HudiTableConfig {
}
}
+/// Config value for [HudiTableConfig::TableType].
#[derive(Clone, Debug, PartialEq, AsRefStr)]
pub enum TableTypeValue {
#[strum(serialize = "COPY_ON_WRITE")]
@@ -149,6 +206,7 @@ impl FromStr for TableTypeValue {
}
}
+/// Config value for [HudiTableConfig::BaseFileFormat].
#[derive(Clone, Debug, PartialEq, AsRefStr)]
pub enum BaseFileFormatValue {
#[strum(serialize = "parquet")]
diff --git a/crates/core/src/file_group/mod.rs
b/crates/core/src/file_group/mod.rs
index c947de4..7afe537 100644
--- a/crates/core/src/file_group/mod.rs
+++ b/crates/core/src/file_group/mod.rs
@@ -16,6 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
+//! This module is for File Group related models and APIs.
+//!
+//! A set of data/base files + set of log files, that make up a unit for all
operations.
use std::collections::BTreeMap;
use std::fmt;
@@ -29,15 +32,21 @@ use crate::storage::file_info::FileInfo;
use crate::storage::file_stats::FileStats;
use crate::storage::Storage;
+/// Represents common metadata about a Hudi Base File.
#[derive(Clone, Debug)]
pub struct BaseFile {
+ /// The file group id that is unique across the table.
pub file_group_id: String,
+
pub commit_time: String,
+
pub info: FileInfo,
+
pub stats: Option<FileStats>,
}
impl BaseFile {
+ /// Parse file name and extract file_group_id and commit_time.
fn parse_file_name(file_name: &str) -> Result<(String, String)> {
let err_msg = format!("Failed to parse file name '{}' for base file.",
file_name);
let (name, _) =
file_name.rsplit_once('.').ok_or(anyhow!(err_msg.clone()))?;
@@ -47,6 +56,7 @@ impl BaseFile {
Ok((file_group_id, commit_time))
}
+ /// Construct [BaseFile] with the base file name.
pub fn from_file_name(file_name: &str) -> Result<Self> {
let (file_group_id, commit_time) = Self::parse_file_name(file_name)?;
Ok(Self {
@@ -57,6 +67,7 @@ impl BaseFile {
})
}
+ /// Construct [BaseFile] with the [FileInfo].
pub fn from_file_info(info: FileInfo) -> Result<Self> {
let (file_group_id, commit_time) = Self::parse_file_name(&info.name)?;
Ok(Self {
@@ -68,6 +79,10 @@ impl BaseFile {
}
}
+/// Within a file group, a slice is a combination of data file written at a
commit time and list of log files,
+/// containing changes to the data file from that commit time.
+///
+/// [note] The log files are not yet supported.
#[derive(Clone, Debug)]
pub struct FileSlice {
pub base_file: BaseFile,
diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs
index 9b492e9..3369101 100644
--- a/crates/core/src/lib.rs
+++ b/crates/core/src/lib.rs
@@ -16,6 +16,37 @@
* specific language governing permissions and limitations
* under the License.
*/
+//! Crate `hudi-core`.
+//!
+//! # The [config] module is responsible for managing configurations.
+//!
+//! **Example**
+//!
+//! ```rust
+//! use url::Url;
+//! use hudi_core::config::read::HudiReadConfig::{AsOfTimestamp,
InputPartitions};
+//! use hudi_core::table::Table as HudiTable;
+//!
+//! let options = vec![(InputPartitions.as_ref(), "2"),
+//! (AsOfTimestamp.as_ref(), "20240101010100000")];
+//! let base_uri = Url::from_file_path("/tmp/hudi_data").unwrap();
+//! HudiTable::new_with_options(base_uri.as_ref(), options);
+//! ```
+//!
+//! # The [table] module is responsible for managing Hudi tables.
+//!
+//! **Example**
+//!
+//! create hudi table
+//! ```rust
+//! use url::Url;
+//! use hudi_core::table::Table;
+//!
+//! pub async fn test() {
+//! let base_uri = Url::from_file_path("/tmp/hudi_data").unwrap();
+//! let hudi_table = Table::new(base_uri.path()).await.unwrap();
+//! }
+//! ```
pub mod config;
pub mod file_group;
diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs
index 5ddfde4..9985d0c 100644
--- a/crates/core/src/storage/mod.rs
+++ b/crates/core/src/storage/mod.rs
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+//! This module is responsible for interacting with the storage layer.
use std::collections::HashMap;
use std::path::PathBuf;
@@ -185,6 +186,15 @@ impl Storage {
}
}
+/// Get relative paths of leaf directories under a given directory.
+///
+/// **Example**
+/// - /usr/hudi/table_name
+/// - /usr/hudi/table_name/.hoodie
+/// - /usr/hudi/table_name/dt=2024/month=01/day=01
+/// - /usr/hudi/table_name/dt=2025/month=02
+///
+/// the result is \[".hoodie", "dt=2024/mont=01/day=01", "dt=2025/month=02"\]
#[async_recursion]
pub async fn get_leaf_dirs(storage: &Storage, subdir: Option<&str>) ->
Result<Vec<String>> {
let mut leaf_dirs = Vec::new();
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index e81cabd..b20f4ca 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -16,6 +16,73 @@
* specific language governing permissions and limitations
* under the License.
*/
+//! This module is responsible for Hudi table APIs.
+//!
+//! It provides a quick entry point for reading Hudi table metadata and data,
+//! facilitating adaptation and compatibility across various engines.
+//!
+//! **Example**
+//! 1. create hudi table
+//! ```rust
+//! use url::Url;
+//! use hudi_core::table::Table;
+//!
+//! pub async fn test() {
+//! let base_uri = Url::from_file_path("/tmp/hudi_data").unwrap();
+//! let hudi_table = Table::new(base_uri.path()).await.unwrap();
+//! }
+//! ```
+//! 2. get hudi table schema(arrow_schema::Schema)
+//! ```rust
+//! use url::Url;
+//! use hudi_core::table::Table;
+//!
+//! pub async fn test() {
+//! use arrow_schema::Schema;
+//! let base_uri = Url::from_file_path("/tmp/hudi_data").unwrap();
+//! let hudi_table = Table::new(base_uri.path()).await.unwrap();
+//! let schema = hudi_table.get_schema().await.unwrap();
+//! }
+//! ```
+//! 3. read hudi table
+//! ```rust
+//! use url::Url;
+//! use hudi_core::table::Table;
+//!
+//! pub async fn test() {
+//! let base_uri = Url::from_file_path("/tmp/hudi_data").unwrap();
+//! let hudi_table = Table::new(base_uri.path()).await.unwrap();
+//! let record_read = hudi_table.read_snapshot().await.unwrap();
+//! }
+//! ```
+//! 4. get file slice
+//! Users can obtain metadata to customize reading methods, read in batches,
perform parallel reads, and more.
+//! ```rust
+//! use url::Url;
+//! use hudi_core::table::Table;
+//! use hudi_core::storage::utils::parse_uri;
+//!
+//! pub async fn test() {
+//! let base_uri = Url::from_file_path("/tmp/hudi_data").unwrap();
+//! let hudi_table = Table::new(base_uri.path()).await.unwrap();
+//! let file_slices = hudi_table
+//! .split_file_slices(2)
+//! .await.unwrap();
+//! // define every parquet task reader how many slice
+//! let mut parquet_file_groups: Vec<Vec<String>> = Vec::new();
+//! for file_slice_vec in file_slices {
+//! let file_group_vec = file_slice_vec
+//! .iter()
+//! .map(|f| {
+//! let url = parse_uri(&f.base_file.info.uri).unwrap();
+//! let size = f.base_file.info.size as u64;
+//! url.path().to_string()
+//! })
+//! .collect();
+//! parquet_file_groups.push(file_group_vec)
+//! }
+//! }
+//! ```
use std::collections::HashMap;
use std::env;
@@ -48,6 +115,7 @@ use crate::table::timeline::Timeline;
mod fs_view;
mod timeline;
+/// Hudi Table in-memory
#[derive(Clone, Debug)]
pub struct Table {
pub base_url: Arc<Url>,
@@ -58,10 +126,12 @@ pub struct Table {
}
impl Table {
+ /// Create hudi table by base_uri
pub async fn new(base_uri: &str) -> Result<Self> {
Self::new_with_options(base_uri, empty_options()).await
}
+ /// Create hudi table with options
pub async fn new_with_options<I, K, V>(base_uri: &str, all_options: I) ->
Result<Self>
where
I: IntoIterator<Item = (K, V)>,
@@ -230,10 +300,12 @@ impl Table {
Ok(())
}
+ /// Get the latest [Schema] of the table.
pub async fn get_schema(&self) -> Result<Schema> {
self.timeline.get_latest_schema().await
}
+ /// Split the file into a specified number of parts
pub async fn split_file_slices(&self, n: usize) ->
Result<Vec<Vec<FileSlice>>> {
let n = std::cmp::max(1, n);
let file_slices = self.get_file_slices().await?;
@@ -245,6 +317,9 @@ impl Table {
.collect())
}
+ /// Get all the [FileSlice]s in the table.
+ ///
+ /// If the [AsOfTimestamp] configuration is set, the file slices at the
specified timestamp will be returned.
pub async fn get_file_slices(&self) -> Result<Vec<FileSlice>> {
if let Some(timestamp) = self.configs.try_get(AsOfTimestamp) {
self.get_file_slices_as_of(timestamp.to::<String>().as_str())
@@ -256,6 +331,7 @@ impl Table {
}
}
+ /// Get all the [FileSlice]s at a given timestamp, as a time travel query.
async fn get_file_slices_as_of(&self, timestamp: &str) ->
Result<Vec<FileSlice>> {
let excludes = self.timeline.get_replaced_file_groups().await?;
self.file_system_view
@@ -266,6 +342,9 @@ impl Table {
.get_file_slices_as_of(timestamp, &excludes)
}
+ /// Get all the latest records in the table.
+ ///
+ /// If the [AsOfTimestamp] configuration is set, the records at the
specified timestamp will be returned.
pub async fn read_snapshot(&self) -> Result<Vec<RecordBatch>> {
if let Some(timestamp) = self.configs.try_get(AsOfTimestamp) {
self.read_snapshot_as_of(timestamp.to::<String>().as_str())
@@ -277,6 +356,7 @@ impl Table {
}
}
+ /// Get all the records in the table at a given timestamp, as a time
travel query.
async fn read_snapshot_as_of(&self, timestamp: &str) ->
Result<Vec<RecordBatch>> {
let file_slices = self
.get_file_slices_as_of(timestamp)
@@ -301,6 +381,25 @@ impl Table {
Ok(file_paths)
}
+ /// Read records from a [FileSlice] by its relative path.
+ ///
+ /// **Example**
+ ///
+ /// ```rust
+ /// use url::Url;
+ /// use hudi_core::table::Table;
+ ///
+ /// pub async fn test() {
+ /// let base_uri = Url::from_file_path("/tmp/hudi_data").unwrap();
+ /// let hudi_table = Table::new(base_uri.path()).await.unwrap();
+ /// let batches = hudi_table
+ /// .read_file_slice_by_path(
+ ///
"a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",
+ /// )
+ /// .await
+ /// .unwrap();
+ /// }
+ /// ```
pub async fn read_file_slice_by_path(&self, relative_path: &str) ->
Result<RecordBatch> {
self.file_system_view
.read_file_slice_by_path_unchecked(relative_path)