This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new ce08d9d refactor: improve schema resolution flow (#364)
ce08d9d is described below
commit ce08d9d6546f251b044c1cd6f574339236fabcdb
Author: Shiyan Xu <[email protected]>
AuthorDate: Sat Jun 28 00:43:42 2025 -0500
refactor: improve schema resolution flow (#364)
Added `crates/core/src/schema/resolver.rs` to keep all schema resolution
functions.
For empty tables with no commits, get the `hoodie.table.create.schema` from
the table props if available.
---
crates/core/src/config/table.rs | 5 +
crates/core/src/error.rs | 6 +
crates/core/src/schema/mod.rs | 21 +-
crates/core/src/schema/resolver.rs | 198 ++++++++++++++++++
crates/core/src/table/builder.rs | 5 +-
crates/core/src/table/mod.rs | 144 ++++++++-----
crates/core/src/timeline/mod.rs | 222 ++++++++++-----------
.../.hoodie/hoodie.properties | 14 ++
.../.hoodie/20250628002223107.commit | 94 +++++++++
.../.hoodie/20250628002223107.commit.requested} | 0
.../.hoodie/20250628002223107.inflight | 82 ++++++++
...15355d9b0ed-0_2-13-37_20250628002223107.parquet | Bin 0 -> 436045 bytes
...eedbe6e09ed-0_0-13-35_20250628002223107.parquet | Bin 0 -> 436228 bytes
...7e75252c69f-0_1-13-36_20250628002223107.parquet | Bin 0 -> 436064 bytes
.../.hoodie/20250331030642808.deltacommit | 94 +++++++++
.../.hoodie/20250331030642808.deltacommit.inflight | 82 ++++++++
.../20250331030642808.deltacommit.requested} | 0
.../.hoodie/20250331030645735.deltacommit | 43 ++++
.../.hoodie/20250331030645735.deltacommit.inflight | 56 ++++++
.../20250331030645735.deltacommit.requested} | 0
...-5ff632f79224-0_20250331030642808.log.1_0-26-85 | Bin 0 -> 1148 bytes
...ff632f79224-0_0-13-60_20250331030642808.parquet | Bin 0 -> 436406 bytes
.../.hoodie/20240402144910683.commit | 1 +
.../.hoodie/20240402144910683.commit.requested | 0
.../.hoodie/20240402144910683.inflight | 0
.../.hoodie/hoodie.properties | 0
.../.hoodie/20240402144910683.commit | 6 +
.../.hoodie/20240402144910683.commit.requested | 0
.../.hoodie/20240402144910683.inflight | 0
.../.hoodie/hoodie.properties | 0
.../.hoodie/20240402144910683.commit | 0
.../.hoodie/20240402144910683.commit.requested | 0
.../.hoodie/20240402144910683.inflight | 0
.../.hoodie/hoodie.properties | 0
crates/datafusion/src/lib.rs | 9 +-
crates/test/src/util.rs | 32 +++
36 files changed, 931 insertions(+), 183 deletions(-)
diff --git a/crates/core/src/config/table.rs b/crates/core/src/config/table.rs
index 149938e..5264236 100644
--- a/crates/core/src/config/table.rs
+++ b/crates/core/src/config/table.rs
@@ -56,6 +56,9 @@ pub enum HudiTableConfig {
/// It is added as the last entry in hoodie.properties and then used to
validate while reading table config.
Checksum,
+ /// Avro schema used when creating the table.
+ CreateSchema,
+
/// Database name that will be used for incremental query.
/// If different databases have the same table name during incremental
query,
/// we can set it to limit the table name under a specific database
@@ -122,6 +125,7 @@ impl AsRef<str> for HudiTableConfig {
Self::BaseFileFormat => "hoodie.table.base.file.format",
Self::BasePath => "hoodie.base.path",
Self::Checksum => "hoodie.table.checksum",
+ Self::CreateSchema => "hoodie.table.create.schema",
Self::DatabaseName => "hoodie.database.name",
Self::DropsPartitionFields =>
"hoodie.datasource.write.drop.partition.columns",
Self::IsHiveStylePartitioning =>
"hoodie.datasource.write.hive_style_partitioning",
@@ -186,6 +190,7 @@ impl ConfigParser for HudiTableConfig {
isize::from_str(v).map_err(|e| ParseInt(self.key(),
v.to_string(), e))
})
.map(HudiConfigValue::Integer),
+ Self::CreateSchema => get_result.map(|v|
HudiConfigValue::String(v.to_string())),
Self::DatabaseName => get_result.map(|v|
HudiConfigValue::String(v.to_string())),
Self::DropsPartitionFields => get_result
.and_then(|v| {
diff --git a/crates/core/src/error.rs b/crates/core/src/error.rs
index 9d4f25b..2cff435 100644
--- a/crates/core/src/error.rs
+++ b/crates/core/src/error.rs
@@ -42,6 +42,9 @@ pub enum CoreError {
#[error("Data type error: {0}")]
Schema(String),
+ #[error("{0}")]
+ SchemaNotFound(String),
+
#[error("File group error: {0}")]
FileGroup(String),
@@ -72,6 +75,9 @@ pub enum CoreError {
#[error("Timeline error: {0}")]
Timeline(String),
+ #[error("Timeline has no completed commit.")]
+ TimelineNoCommit,
+
#[error("{0}")]
TimestampParsingError(String),
diff --git a/crates/core/src/schema/mod.rs b/crates/core/src/schema/mod.rs
index 15c052e..52ce19a 100644
--- a/crates/core/src/schema/mod.rs
+++ b/crates/core/src/schema/mod.rs
@@ -21,6 +21,7 @@ use crate::metadata::meta_field::MetaField;
use arrow_schema::{Schema, SchemaRef};
pub mod delete;
+pub mod resolver;
pub fn prepend_meta_fields(schema: SchemaRef) -> Result<Schema> {
let meta_field_schema = MetaField::schema();
@@ -28,6 +29,7 @@ pub fn prepend_meta_fields(schema: SchemaRef) ->
Result<Schema> {
.map_err(CoreError::ArrowError)
}
+// TODO use this when applicable, like some table config says there is an
operation field
pub fn prepend_meta_fields_with_operation(schema: SchemaRef) -> Result<Schema>
{
let meta_field_schema = MetaField::schema_with_operation();
Schema::try_merge([meta_field_schema.as_ref().clone(),
schema.as_ref().clone()])
@@ -38,27 +40,26 @@ pub fn prepend_meta_fields_with_operation(schema:
SchemaRef) -> Result<Schema> {
mod tests {
use super::*;
use arrow_schema::{DataType, Field};
+ use hudi_test::assert_arrow_field_names_eq;
use std::sync::Arc;
#[test]
fn test_prepend_meta_fields() {
let schema = Schema::new(vec![Field::new("field1", DataType::Int32,
false)]);
let new_schema = prepend_meta_fields(Arc::new(schema)).unwrap();
- assert_eq!(new_schema.fields().len(), 6);
-
- let field_names: Vec<_> = new_schema.fields().iter().map(|f|
f.name()).collect();
- assert_eq!(field_names[..5], MetaField::field_names());
- assert_eq!(field_names[5], "field1");
+ assert_arrow_field_names_eq!(
+ new_schema,
+ [MetaField::field_names(), vec!["field1"]].concat()
+ )
}
#[test]
fn test_prepend_meta_fields_with_operation() {
let schema = Schema::new(vec![Field::new("field1", DataType::Int32,
false)]);
let new_schema =
prepend_meta_fields_with_operation(Arc::new(schema)).unwrap();
- assert_eq!(new_schema.fields().len(), 7);
-
- let field_names: Vec<_> = new_schema.fields().iter().map(|f|
f.name()).collect();
- assert_eq!(field_names[..6], MetaField::field_names_with_operation());
- assert_eq!(field_names[6], "field1");
+ assert_arrow_field_names_eq!(
+ new_schema,
+ [MetaField::field_names_with_operation(), vec!["field1"]].concat()
+ )
}
}
diff --git a/crates/core/src/schema/resolver.rs
b/crates/core/src/schema/resolver.rs
new file mode 100644
index 0000000..7095849
--- /dev/null
+++ b/crates/core/src/schema/resolver.rs
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use crate::avro_to_arrow::to_arrow_schema;
+use crate::config::table::HudiTableConfig;
+use crate::error::{CoreError, Result};
+use crate::schema::prepend_meta_fields;
+use crate::storage::Storage;
+use crate::table::Table;
+use apache_avro::schema::Schema as AvroSchema;
+use arrow_schema::{Schema, SchemaRef};
+use serde_json::{Map, Value};
+use std::path::PathBuf;
+use std::str::FromStr;
+use std::sync::Arc;
+
+/// Resolves the [`arrow_schema::Schema`] for a given Hudi table.
+///
+/// The resolution process follows these steps:
+/// - If the timeline has commit metadata, read the schema field from it.
+/// - If the commit metadata has no schema, read the schema from the base
file pointed by the first entry in the write-status of the commit metadata.
+/// - If the timeline has no commit metadata, read
[`HudiTableConfig::CreateSchema`] from `hoodie.properties`.
+pub async fn resolve_schema(table: &Table) -> Result<Schema> {
+ let timeline = table.get_timeline();
+ match timeline.get_latest_commit_metadata().await {
+ Ok(metadata) => {
+ resolve_schema_from_commit_metadata(&metadata,
timeline.storage.clone()).await
+ }
+ Err(CoreError::TimelineNoCommit) => {
+ if let Some(create_schema) =
table.hudi_configs.try_get(HudiTableConfig::CreateSchema) {
+ let avro_schema_str = create_schema.to::<String>();
+ let arrow_schema =
arrow_schema_from_avro_schema_str(&avro_schema_str)?;
+ prepend_meta_fields(SchemaRef::new(arrow_schema))
+ } else {
+ Err(CoreError::SchemaNotFound(
+ "No completed commit, and no create schema for the
table.".to_string(),
+ ))
+ }
+ }
+ Err(e) => Err(e),
+ }
+}
+
+/// Resolves the [`apache_avro::schema::Schema`] as a [`String`] for a given
Hudi table.
+///
+/// The resolution process follows these steps:
+/// - If the timeline has commit metadata, read the schema field from it.
+/// - If the timeline has no commit metadata, read
[`HudiTableConfig::CreateSchema`] from `hoodie.properties`.
+///
+/// ### Note
+///
+/// - For resolving Avro schema, we don't read the schema from a base file
like we do when resolving Arrow schema.
+/// - Avro schema does not contain [`MetaField`]s.
+pub async fn resolve_avro_schema(table: &Table) -> Result<String> {
+ let timeline = table.get_timeline();
+ match timeline.get_latest_commit_metadata().await {
+ Ok(metadata) => resolve_avro_schema_from_commit_metadata(&metadata),
+ Err(CoreError::TimelineNoCommit) => {
+ if let Some(create_schema) =
table.hudi_configs.try_get(HudiTableConfig::CreateSchema) {
+ let create_schema = create_schema.to::<String>();
+ Ok(sanitize_avro_schema_str(&create_schema))
+ } else {
+ Err(CoreError::SchemaNotFound(
+ "No completed commit, and no create schema for the
table.".to_string(),
+ ))
+ }
+ }
+ Err(e) => Err(e),
+ }
+}
+
+pub(crate) async fn resolve_schema_from_commit_metadata(
+ commit_metadata: &Map<String, Value>,
+ storage: Arc<Storage>,
+) -> Result<Schema> {
+ let avro_schema_str = match
resolve_avro_schema_from_commit_metadata(commit_metadata) {
+ Ok(s) => s,
+ Err(CoreError::SchemaNotFound(_)) => {
+ return resolve_schema_from_base_file(commit_metadata,
storage).await
+ }
+ Err(e) => return Err(e),
+ };
+
+ let arrow_schema = arrow_schema_from_avro_schema_str(&avro_schema_str)?;
+ prepend_meta_fields(SchemaRef::new(arrow_schema))
+}
+
+pub(crate) fn resolve_avro_schema_from_commit_metadata(
+ commit_metadata: &Map<String, Value>,
+) -> Result<String> {
+ if commit_metadata.is_empty() {
+ return Err(CoreError::CommitMetadata(
+ "Commit metadata is empty.".to_string(),
+ ));
+ }
+
+ match extract_avro_schema_from_commit_metadata(commit_metadata) {
+ Some(schema) => Ok(schema),
+ None => Err(CoreError::SchemaNotFound(
+ "No schema found in the commit metadata.".to_string(),
+ )),
+ }
+}
+
+async fn resolve_schema_from_base_file(
+ commit_metadata: &Map<String, Value>,
+ storage: Arc<Storage>,
+) -> Result<Schema> {
+ let first_partition = commit_metadata
+ .get("partitionToWriteStats")
+ .and_then(|v| v.as_object());
+
+ let partition_path = first_partition
+ .and_then(|obj| obj.keys().next())
+ .ok_or_else(|| {
+ CoreError::CommitMetadata(
+ "Failed to resolve the latest schema: no write status in
commit metadata"
+ .to_string(),
+ )
+ })?;
+
+ let first_value = first_partition
+ .and_then(|obj| obj.values().next())
+ .and_then(|value| value.as_array())
+ .and_then(|arr| arr.first());
+
+ let base_file_path = first_value.and_then(|v| v["path"].as_str());
+ match base_file_path {
+ Some(path) if path.ends_with(".parquet") => {
+ Ok(storage.get_parquet_file_schema(path).await?)
+ }
+ Some(_) => {
+ // deltacommit case
+ // TODO: properly parse deltacommit structure
+ let base_file = first_value
+ .and_then(|v| v["baseFile"].as_str())
+ .ok_or_else(|| {
+ CoreError::CommitMetadata(
+ "Failed to resolve the latest schema: no file path
found".to_string(),
+ )
+ })?;
+ let parquet_file_path_buf = PathBuf::from_str(partition_path)
+ .map_err(|e| {
+ CoreError::CommitMetadata(format!("Failed to resolve the
latest schema: {}", e))
+ })?
+ .join(base_file);
+ let path = parquet_file_path_buf.to_str().ok_or_else(|| {
+ CoreError::CommitMetadata(
+ "Failed to resolve the latest schema: invalid file
path".to_string(),
+ )
+ })?;
+ Ok(storage.get_parquet_file_schema(path).await?)
+ }
+ None => Err(CoreError::CommitMetadata(
+ "Failed to resolve the latest schema: no file path
found".to_string(),
+ )),
+ }
+}
+
+fn sanitize_avro_schema_str(avro_schema_str: &str) -> String {
+ avro_schema_str.trim().replace("\\:", ":")
+}
+
+fn arrow_schema_from_avro_schema_str(avro_schema_str: &str) -> Result<Schema> {
+ let s = sanitize_avro_schema_str(avro_schema_str);
+ let avro_schema = AvroSchema::parse_str(&s)
+ .map_err(|e| CoreError::Schema(format!("Failed to parse Avro schema:
{}", e)))?;
+
+ to_arrow_schema(&avro_schema)
+}
+
+fn extract_avro_schema_from_commit_metadata(
+ commit_metadata: &Map<String, Value>,
+) -> Option<String> {
+ commit_metadata
+ .get("extraMetadata")
+ .and_then(|v| v.as_object())
+ .and_then(|obj| {
+ obj.get("schema")
+ .and_then(|v| v.as_str())
+ .map(|s| s.to_string())
+ })
+}
diff --git a/crates/core/src/table/builder.rs b/crates/core/src/table/builder.rs
index d123126..8b5bf92 100644
--- a/crates/core/src/table/builder.rs
+++ b/crates/core/src/table/builder.rs
@@ -233,8 +233,9 @@ impl OptionResolver {
let bytes = storage.get_file_data(".hoodie/hoodie.properties").await?;
let table_properties = parse_data_for_options(&bytes, "=")?;
- // We currently treat all table properties as the highest precedence,
which is valid for most cases.
- // TODO: handle the case where the same key is present in both table
properties and options
+ // Table properties on storage (hoodie.properties) should have the
highest precedence,
+ // except for writer-changeable properties like enabling metadata
table/indexes.
+ // TODO: return err when user-provided options conflict with table
properties
for (k, v) in table_properties {
options.insert(k.to_string(), v.to_string());
}
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 949ca80..7d281f7 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -100,6 +100,7 @@ use crate::config::HudiConfigs;
use crate::expr::filter::{from_str_tuples, Filter};
use crate::file_group::file_slice::FileSlice;
use crate::file_group::reader::FileGroupReader;
+use crate::schema::resolver::{resolve_avro_schema, resolve_schema};
use crate::table::builder::TableBuilder;
use crate::table::fs_view::FileSystemView;
use crate::table::partition::PartitionPruner;
@@ -218,8 +219,17 @@ impl Table {
}
/// Get the latest Avro schema string of the table.
+ ///
+ /// The implementation looks for the schema in the following order:
+ /// 1. Timeline commit metadata.
+ /// 2. `hoodie.properties` file's [HudiTableConfig::CreateSchema].
+ ///
+ /// ### Note
+ ///
+ /// The schema returned does not contain Hudi's [MetaField]s,
+ /// which is different from the one returned by [Table::get_schema].
pub async fn get_avro_schema(&self) -> Result<String> {
- self.timeline.get_latest_avro_schema().await
+ resolve_avro_schema(self).await
}
/// Same as [Table::get_avro_schema], but blocking.
@@ -231,8 +241,13 @@ impl Table {
}
/// Get the latest [arrow_schema::Schema] of the table.
+ ///
+ /// The implementation looks for the schema in the following order:
+ /// 1. Timeline commit metadata.
+ /// 2. Base file schema.
+ /// 3. `hoodie.properties` file's [HudiTableConfig::CreateSchema].
pub async fn get_schema(&self) -> Result<Schema> {
- self.timeline.get_latest_schema().await
+ resolve_schema(self).await
}
/// Same as [Table::get_schema], but blocking.
@@ -706,9 +721,11 @@ mod tests {
};
use crate::config::util::{empty_filters, empty_options};
use crate::config::HUDI_CONF_DIR;
+ use crate::error::CoreError;
+ use crate::metadata::meta_field::MetaField;
use crate::storage::util::join_url_segments;
use crate::storage::Storage;
- use hudi_test::SampleTable;
+ use hudi_test::{assert_arrow_field_names_eq, assert_avro_field_names_eq,
SampleTable};
use std::collections::HashSet;
use std::fs::canonicalize;
use std::path::PathBuf;
@@ -784,67 +801,88 @@ mod tests {
}
}
+ #[test]
+ fn hudi_table_get_schema_from_empty_table_without_create_schema() {
+ let table =
get_test_table_without_validation("table_props_no_create_schema");
+
+ let schema = table.get_schema_blocking();
+ assert!(schema.is_err());
+ assert!(matches!(schema.unwrap_err(), CoreError::SchemaNotFound(_)));
+
+ let schema = table.get_avro_schema_blocking();
+ assert!(schema.is_err());
+ assert!(matches!(schema.unwrap_err(), CoreError::SchemaNotFound(_)));
+ }
+
+ #[test]
+ fn
hudi_table_get_schema_from_empty_table_resolves_to_table_create_schema() {
+ for base_url in SampleTable::V6Empty.urls() {
+ let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+
+ // Validate the Arrow schema
+ let schema = hudi_table.get_schema_blocking();
+ assert!(schema.is_ok());
+ let schema = schema.unwrap();
+ assert_arrow_field_names_eq!(
+ schema,
+ [MetaField::field_names(), vec!["id", "name",
"isActive"]].concat()
+ );
+
+ // Validate the Avro schema
+ let avro_schema = hudi_table.get_avro_schema_blocking();
+ assert!(avro_schema.is_ok());
+ let avro_schema = avro_schema.unwrap();
+ assert_avro_field_names_eq!(&avro_schema, ["id", "name",
"isActive"])
+ }
+ }
+
#[test]
fn hudi_table_get_schema() {
let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
let hudi_table = Table::new_blocking(base_url.path()).unwrap();
- let fields: Vec<String> = hudi_table
- .get_schema_blocking()
- .unwrap()
- .flattened_fields()
- .into_iter()
- .map(|f| f.name().to_string())
- .collect();
- assert_eq!(
- fields,
- vec![
- "_hoodie_commit_time",
- "_hoodie_commit_seqno",
- "_hoodie_record_key",
- "_hoodie_partition_path",
- "_hoodie_file_name",
- "id",
- "name",
- "isActive",
- "byteField",
- "shortField",
- "intField",
- "longField",
- "floatField",
- "doubleField",
- "decimalField",
- "dateField",
- "timestampField",
- "binaryField",
- "arrayField",
- "element",
- "arr_struct_f1",
- "arr_struct_f2",
- "mapField",
- "map_field_value_struct_f1",
- "map_field_value_struct_f2",
- "structField",
- "field1",
- "field2",
- "child_struct",
- "child_field1",
- "child_field2"
- ]
+ let original_field_names = [
+ "id",
+ "name",
+ "isActive",
+ "byteField",
+ "shortField",
+ "intField",
+ "longField",
+ "floatField",
+ "doubleField",
+ "decimalField",
+ "dateField",
+ "timestampField",
+ "binaryField",
+ "arrayField",
+ "mapField",
+ "structField",
+ ];
+
+ // Check Arrow schema
+ let arrow_schema = hudi_table.get_schema_blocking();
+ assert!(arrow_schema.is_ok());
+ let arrow_schema = arrow_schema.unwrap();
+ assert_arrow_field_names_eq!(
+ arrow_schema,
+ [MetaField::field_names(), original_field_names.to_vec()].concat()
);
+
+ // Check Avro schema
+ let avro_schema = hudi_table.get_avro_schema_blocking();
+ assert!(avro_schema.is_ok());
+ let avro_schema = avro_schema.unwrap();
+ assert_avro_field_names_eq!(&avro_schema, original_field_names);
}
#[test]
fn hudi_table_get_partition_schema() {
let base_url = SampleTable::V6TimebasedkeygenNonhivestyle.url_to_cow();
let hudi_table = Table::new_blocking(base_url.path()).unwrap();
- let fields: Vec<String> = hudi_table
- .get_partition_schema_blocking()
- .unwrap()
- .flattened_fields()
- .into_iter()
- .map(|f| f.name().to_string())
- .collect();
- assert_eq!(fields, vec!["ts_str"]);
+ let schema = hudi_table.get_partition_schema_blocking();
+ assert!(schema.is_ok());
+ let schema = schema.unwrap();
+ assert_arrow_field_names_eq!(schema, ["ts_str"]);
}
#[test]
diff --git a/crates/core/src/timeline/mod.rs b/crates/core/src/timeline/mod.rs
index b76747b..094c07d 100644
--- a/crates/core/src/timeline/mod.rs
+++ b/crates/core/src/timeline/mod.rs
@@ -20,25 +20,24 @@ pub mod instant;
pub(crate) mod selector;
pub(crate) mod util;
-use crate::avro_to_arrow::to_arrow_schema;
use crate::config::HudiConfigs;
use crate::error::CoreError;
use crate::file_group::builder::{build_file_groups,
build_replaced_file_groups, FileGroupMerger};
use crate::file_group::FileGroup;
use crate::metadata::HUDI_METADATA_DIR;
-use crate::schema::prepend_meta_fields;
+use crate::schema::resolver::{
+ resolve_avro_schema_from_commit_metadata,
resolve_schema_from_commit_metadata,
+};
use crate::storage::Storage;
use crate::timeline::instant::Action;
use crate::timeline::selector::TimelineSelector;
use crate::Result;
-use arrow_schema::{Schema, SchemaRef};
+use arrow_schema::Schema;
use instant::Instant;
use log::debug;
use serde_json::{Map, Value};
use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
-use std::path::PathBuf;
-use std::str::FromStr;
use std::sync::Arc;
/// A [Timeline] contains transaction logs of all actions performed on the
table at different [Instant]s of time.
@@ -214,10 +213,10 @@ impl Timeline {
.map_err(|e| CoreError::Timeline(format!("Failed to get commit
metadata: {}", e)))
}
- async fn get_latest_commit_metadata(&self) -> Result<Map<String, Value>> {
+ pub(crate) async fn get_latest_commit_metadata(&self) ->
Result<Map<String, Value>> {
match self.completed_commits.iter().next_back() {
Some(instant) => self.get_instant_metadata(instant).await,
- None => Ok(Map::new()),
+ None => Err(CoreError::TimelineNoCommit),
}
}
@@ -232,101 +231,28 @@ impl Timeline {
///
/// Only completed commits are considered.
pub fn get_latest_commit_timestamp(&self) -> Result<String> {
- self.get_latest_commit_timestamp_as_option().map_or_else(
- || Err(CoreError::Timeline("No commits found".to_string())),
- |t| Ok(t.to_string()),
- )
- }
-
- fn extract_avro_schema_from_commit_metadata(
- commit_metadata: &Map<String, Value>,
- ) -> Option<String> {
- commit_metadata
- .get("extraMetadata")
- .and_then(|v| v.as_object())
- .and_then(|obj| {
- obj.get("schema")
- .and_then(|v| v.as_str())
- .map(|s| s.to_string())
- })
+ self.get_latest_commit_timestamp_as_option()
+ .map_or_else(|| Err(CoreError::TimelineNoCommit), |t|
Ok(t.to_string()))
}
- /// Get the latest Avro schema string from the [Timeline].
+ /// Get the latest [apache_avro::schema::Schema] as [String] from the
[Timeline].
+ ///
+ /// ### Note
+ /// This API behaves differently from
[crate::table::Table::get_avro_schema],
+ /// which additionally looks for [HudiTableConfig::CreateSchema] in the
table config.
pub async fn get_latest_avro_schema(&self) -> Result<String> {
let commit_metadata = self.get_latest_commit_metadata().await?;
-
Self::extract_avro_schema_from_commit_metadata(&commit_metadata).ok_or_else(|| {
- CoreError::CommitMetadata(
- "Failed to resolve the latest schema: no schema
found".to_string(),
- )
- })
+ resolve_avro_schema_from_commit_metadata(&commit_metadata)
}
/// Get the latest [arrow_schema::Schema] from the [Timeline].
+ ///
+ /// ### Note
+ /// This API behaves differently from [crate::table::Table::get_schema],
+ /// which additionally looks for [HudiTableConfig::CreateSchema] in the
table config.
pub async fn get_latest_schema(&self) -> Result<Schema> {
let commit_metadata = self.get_latest_commit_metadata().await?;
-
- if let Some(avro_schema) =
Self::extract_avro_schema_from_commit_metadata(&commit_metadata)
- {
- let avro_schema =
apache_avro::schema::Schema::parse_str(&avro_schema)?;
- let arrow_schema = to_arrow_schema(&avro_schema).map_err(|e| {
- CoreError::CommitMetadata(format!(
- "Failed to convert the latest Avro schema: {}",
- e
- ))
- })?;
- return prepend_meta_fields(SchemaRef::new(arrow_schema));
- }
-
- let first_partition = commit_metadata
- .get("partitionToWriteStats")
- .and_then(|v| v.as_object());
-
- let partition_path = first_partition
- .and_then(|obj| obj.keys().next())
- .ok_or_else(|| {
- CoreError::CommitMetadata(
- "Failed to resolve the latest schema: no partition path
found".to_string(),
- )
- })?;
-
- let first_value = first_partition
- .and_then(|obj| obj.values().next())
- .and_then(|value| value.as_array())
- .and_then(|arr| arr.first());
-
- let parquet_path = first_value.and_then(|v| v["path"].as_str());
- match parquet_path {
- Some(path) if path.ends_with(".parquet") => {
- Ok(self.storage.get_parquet_file_schema(path).await?)
- }
- Some(_) => {
- // TODO: properly handle deltacommit
- let base_file = first_value
- .and_then(|v| v["baseFile"].as_str())
- .ok_or_else(|| {
- CoreError::CommitMetadata(
- "Failed to resolve the latest schema: no file path
found".to_string(),
- )
- })?;
- let parquet_file_path_buf = PathBuf::from_str(partition_path)
- .map_err(|e| {
- CoreError::CommitMetadata(format!(
- "Failed to resolve the latest schema: {}",
- e
- ))
- })?
- .join(base_file);
- let path = parquet_file_path_buf.to_str().ok_or_else(|| {
- CoreError::CommitMetadata(
- "Failed to resolve the latest schema: invalid file
path".to_string(),
- )
- })?;
- Ok(self.storage.get_parquet_file_schema(path).await?)
- }
- None => Err(CoreError::CommitMetadata(
- "Failed to resolve the latest schema: no file path
found".to_string(),
- )),
- }
+ resolve_schema_from_commit_metadata(&commit_metadata,
self.storage.clone()).await
}
pub(crate) async fn get_replaced_file_groups_as_of(
@@ -392,7 +318,7 @@ mod tests {
use url::Url;
- use hudi_test::SampleTable;
+ use hudi_test::{assert_arrow_field_names_eq, assert_avro_field_names_eq,
SampleTable};
use crate::config::table::HudiTableConfig;
use crate::metadata::meta_field::MetaField;
@@ -420,11 +346,10 @@ mod tests {
let timeline = create_test_timeline(base_url).await;
let table_schema = timeline.get_latest_schema().await;
assert!(table_schema.is_err());
- assert!(table_schema
- .err()
- .unwrap()
- .to_string()
- .starts_with("Commit metadata error: Failed to resolve the latest
schema:"))
+ assert!(matches!(
+ table_schema.unwrap_err(),
+ CoreError::TimelineNoCommit
+ ))
}
#[tokio::test]
@@ -473,39 +398,104 @@ mod tests {
}
#[tokio::test]
- async fn get_avro_schema() {
+ async fn timeline_get_schema_returns_error_for_no_schema_and_write_stats()
{
let base_url = Url::from_file_path(
-
canonicalize(Path::new("tests/data/timeline/commits_with_valid_schema")).unwrap(),
+ canonicalize(Path::new(
+ "tests/data/timeline/commits_with_no_schema_and_write_stats",
+ ))
+ .unwrap(),
)
.unwrap();
let timeline = create_test_timeline(base_url).await;
+ // Check Arrow schema
+ let arrow_schema = timeline.get_latest_schema().await;
+ assert!(arrow_schema.is_err());
+ assert!(matches!(arrow_schema.unwrap_err(),
CoreError::CommitMetadata(_)), "Getting Arrow schema includes base file lookup,
therefore expect CommitMetadata error when write stats are missing");
+
+ // Check Avro schema
let avro_schema = timeline.get_latest_avro_schema().await;
- assert!(avro_schema.is_ok());
- assert_eq!(
- avro_schema.unwrap(),
-
"{\"type\":\"record\",\"name\":\"v6_trips_record\",\"namespace\":\"hoodie.v6_trips\",\"fields\":[{\"name\":\"ts\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"uuid\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"rider\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"driver\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"fare\",\"type\":[\"null\",\"double\"],\"default\":null},{\"name\":\"city\",\"type\":[\"null\",\"stri
[...]
- )
+ assert!(avro_schema.is_err());
+ assert!(matches!(avro_schema.unwrap_err(),
CoreError::SchemaNotFound(_)), "Getting Avro schema does not include base file
lookup, therefore expect SchemaNotFound error when `extraMetadata.schema` is
missing");
}
#[tokio::test]
- async fn get_arrow_schema() {
+ async fn timeline_get_schema_from_commit_metadata() {
let base_url = Url::from_file_path(
-
canonicalize(Path::new("tests/data/timeline/commits_with_valid_schema")).unwrap(),
+ canonicalize(Path::new(
+
"tests/data/timeline/commits_with_valid_schema_in_commit_metadata",
+ ))
+ .unwrap(),
)
.unwrap();
let timeline = create_test_timeline(base_url).await;
+ // Check Arrow schema
let arrow_schema = timeline.get_latest_schema().await;
assert!(arrow_schema.is_ok());
let arrow_schema = arrow_schema.unwrap();
- let fields = arrow_schema
- .fields
- .iter()
- .map(|f| f.name())
- .collect::<Vec<_>>();
- let mut expected_fields = MetaField::field_names();
- expected_fields.extend_from_slice(&["ts", "uuid", "rider", "driver",
"fare", "city"]);
- assert_eq!(fields, expected_fields)
+ assert_arrow_field_names_eq!(
+ arrow_schema,
+ [
+ MetaField::field_names(),
+ vec!["ts", "uuid", "rider", "driver", "fare", "city"]
+ ]
+ .concat()
+ );
+
+ // Check Avro schema
+ let avro_schema = timeline.get_latest_avro_schema().await;
+ assert!(avro_schema.is_ok());
+ let avro_schema = avro_schema.unwrap();
+ assert_avro_field_names_eq!(
+ &avro_schema,
+ ["ts", "uuid", "rider", "driver", "fare", "city"]
+ );
+ }
+
+ #[tokio::test]
+ async fn timeline_get_schema_from_empty_commit_metadata() {
+ let base_url = Url::from_file_path(
+ canonicalize(Path::new(
+ "tests/data/timeline/commits_with_empty_commit_metadata",
+ ))
+ .unwrap(),
+ )
+ .unwrap();
+ let timeline = create_test_timeline(base_url).await;
+
+ // Check Arrow schema
+ let result = timeline.get_latest_schema().await;
+ assert!(result.is_err());
+ assert!(matches!(result.unwrap_err(), CoreError::CommitMetadata(_)));
+
+ // Check Avro schema
+ let result = timeline.get_latest_avro_schema().await;
+ assert!(result.is_err());
+ assert!(matches!(result.unwrap_err(), CoreError::CommitMetadata(_)));
+ }
+
+ #[tokio::test]
+ async fn timeline_get_schema_from_base_file() {
+ let timeline_base_urls = [
+ "tests/data/timeline/commits_load_schema_from_base_file_cow",
+ "tests/data/timeline/commits_load_schema_from_base_file_mor",
+ ];
+ for base_url in timeline_base_urls {
+ let base_url =
Url::from_file_path(canonicalize(Path::new(base_url)).unwrap()).unwrap();
+ let timeline = create_test_timeline(base_url).await;
+
+ let arrow_schema = timeline.get_latest_schema().await;
+ assert!(arrow_schema.is_ok());
+ let arrow_schema = arrow_schema.unwrap();
+ assert_arrow_field_names_eq!(
+ arrow_schema,
+ [
+ MetaField::field_names(),
+ vec!["ts", "uuid", "rider", "driver", "fare", "city"]
+ ]
+ .concat()
+ );
+ }
}
}
diff --git
a/crates/core/tests/data/table_props_no_create_schema/.hoodie/hoodie.properties
b/crates/core/tests/data/table_props_no_create_schema/.hoodie/hoodie.properties
new file mode 100644
index 0000000..af3c8b1
--- /dev/null
+++
b/crates/core/tests/data/table_props_no_create_schema/.hoodie/hoodie.properties
@@ -0,0 +1,14 @@
+#Properties saved on 2024-05-10T16:20:08.106Z
+#Fri May 10 11:20:08 CDT 2024
+hoodie.datasource.write.drop.partition.columns=false
+hoodie.table.type=COPY_ON_WRITE
+hoodie.archivelog.folder=archived
+hoodie.timeline.layout.version=1
+hoodie.table.version=6
+hoodie.table.recordkey.fields=id
+hoodie.database.name=default
+hoodie.datasource.write.partitionpath.urlencode=false
+hoodie.table.name=v6_empty
+hoodie.table.keygenerator.class=org.apache.hudi.keygen.NonpartitionedKeyGenerator
+hoodie.datasource.write.hive_style_partitioning=true
+hoodie.table.checksum=1980710965
diff --git
a/crates/core/tests/data/timeline/commits_load_schema_from_base_file_cow/.hoodie/20250628002223107.commit
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_cow/.hoodie/20250628002223107.commit
new file mode 100644
index 0000000..c0f6ce3
--- /dev/null
+++
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_cow/.hoodie/20250628002223107.commit
@@ -0,0 +1,94 @@
+{
+ "partitionToWriteStats" : {
+ "city=san_francisco" : [ {
+ "fileId" : "b271b5f8-29df-463d-ba4d-feedbe6e09ed-0",
+ "path" :
"city=san_francisco/b271b5f8-29df-463d-ba4d-feedbe6e09ed-0_0-13-35_20250628002223107.parquet",
+ "cdcStats" : null,
+ "prevCommit" : "null",
+ "numWrites" : 4,
+ "numDeletes" : 0,
+ "numUpdateWrites" : 0,
+ "numInserts" : 4,
+ "totalWriteBytes" : 436228,
+ "totalWriteErrors" : 0,
+ "tempPath" : null,
+ "partitionPath" : "city=san_francisco",
+ "totalLogRecords" : 0,
+ "totalLogFilesCompacted" : 0,
+ "totalLogSizeCompacted" : 0,
+ "totalUpdatedRecordsCompacted" : 0,
+ "totalLogBlocks" : 0,
+ "totalCorruptLogBlock" : 0,
+ "totalRollbackBlocks" : 0,
+ "fileSizeInBytes" : 436228,
+ "minEventTime" : null,
+ "maxEventTime" : null,
+ "runtimeStats" : {
+ "totalScanTime" : 0,
+ "totalUpsertTime" : 0,
+ "totalCreateTime" : 606
+ }
+ } ],
+ "city=sao_paulo" : [ {
+ "fileId" : "a3c5da68-55a5-4804-ab8b-57e75252c69f-0",
+ "path" :
"city=sao_paulo/a3c5da68-55a5-4804-ab8b-57e75252c69f-0_1-13-36_20250628002223107.parquet",
+ "cdcStats" : null,
+ "prevCommit" : "null",
+ "numWrites" : 2,
+ "numDeletes" : 0,
+ "numUpdateWrites" : 0,
+ "numInserts" : 2,
+ "totalWriteBytes" : 436064,
+ "totalWriteErrors" : 0,
+ "tempPath" : null,
+ "partitionPath" : "city=sao_paulo",
+ "totalLogRecords" : 0,
+ "totalLogFilesCompacted" : 0,
+ "totalLogSizeCompacted" : 0,
+ "totalUpdatedRecordsCompacted" : 0,
+ "totalLogBlocks" : 0,
+ "totalCorruptLogBlock" : 0,
+ "totalRollbackBlocks" : 0,
+ "fileSizeInBytes" : 436064,
+ "minEventTime" : null,
+ "maxEventTime" : null,
+ "runtimeStats" : {
+ "totalScanTime" : 0,
+ "totalUpsertTime" : 0,
+ "totalCreateTime" : 609
+ }
+ } ],
+ "city=chennai" : [ {
+ "fileId" : "03ffd613-fb74-456e-b6bb-115355d9b0ed-0",
+ "path" :
"city=chennai/03ffd613-fb74-456e-b6bb-115355d9b0ed-0_2-13-37_20250628002223107.parquet",
+ "cdcStats" : null,
+ "prevCommit" : "null",
+ "numWrites" : 2,
+ "numDeletes" : 0,
+ "numUpdateWrites" : 0,
+ "numInserts" : 2,
+ "totalWriteBytes" : 436045,
+ "totalWriteErrors" : 0,
+ "tempPath" : null,
+ "partitionPath" : "city=chennai",
+ "totalLogRecords" : 0,
+ "totalLogFilesCompacted" : 0,
+ "totalLogSizeCompacted" : 0,
+ "totalUpdatedRecordsCompacted" : 0,
+ "totalLogBlocks" : 0,
+ "totalCorruptLogBlock" : 0,
+ "totalRollbackBlocks" : 0,
+ "fileSizeInBytes" : 436045,
+ "minEventTime" : null,
+ "maxEventTime" : null,
+ "runtimeStats" : {
+ "totalScanTime" : 0,
+ "totalUpsertTime" : 0,
+ "totalCreateTime" : 606
+ }
+ } ]
+ },
+ "compacted" : false,
+ "extraMetadata" : { },
+ "operationType" : "INSERT"
+}
diff --git
a/crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/20240402144910683.commit.requested
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_cow/.hoodie/20250628002223107.commit.requested
similarity index 100%
copy from
crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/20240402144910683.commit.requested
copy to
crates/core/tests/data/timeline/commits_load_schema_from_base_file_cow/.hoodie/20250628002223107.commit.requested
diff --git
a/crates/core/tests/data/timeline/commits_load_schema_from_base_file_cow/.hoodie/20250628002223107.inflight
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_cow/.hoodie/20250628002223107.inflight
new file mode 100644
index 0000000..1b807d0
--- /dev/null
+++
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_cow/.hoodie/20250628002223107.inflight
@@ -0,0 +1,82 @@
+{
+ "partitionToWriteStats" : {
+ "city=san_francisco" : [ {
+ "fileId" : "",
+ "path" : null,
+ "cdcStats" : null,
+ "prevCommit" : "null",
+ "numWrites" : 0,
+ "numDeletes" : 0,
+ "numUpdateWrites" : 0,
+ "numInserts" : 4,
+ "totalWriteBytes" : 0,
+ "totalWriteErrors" : 0,
+ "tempPath" : null,
+ "partitionPath" : null,
+ "totalLogRecords" : 0,
+ "totalLogFilesCompacted" : 0,
+ "totalLogSizeCompacted" : 0,
+ "totalUpdatedRecordsCompacted" : 0,
+ "totalLogBlocks" : 0,
+ "totalCorruptLogBlock" : 0,
+ "totalRollbackBlocks" : 0,
+ "fileSizeInBytes" : 0,
+ "minEventTime" : null,
+ "maxEventTime" : null,
+ "runtimeStats" : null
+ } ],
+ "city=sao_paulo" : [ {
+ "fileId" : "",
+ "path" : null,
+ "cdcStats" : null,
+ "prevCommit" : "null",
+ "numWrites" : 0,
+ "numDeletes" : 0,
+ "numUpdateWrites" : 0,
+ "numInserts" : 2,
+ "totalWriteBytes" : 0,
+ "totalWriteErrors" : 0,
+ "tempPath" : null,
+ "partitionPath" : null,
+ "totalLogRecords" : 0,
+ "totalLogFilesCompacted" : 0,
+ "totalLogSizeCompacted" : 0,
+ "totalUpdatedRecordsCompacted" : 0,
+ "totalLogBlocks" : 0,
+ "totalCorruptLogBlock" : 0,
+ "totalRollbackBlocks" : 0,
+ "fileSizeInBytes" : 0,
+ "minEventTime" : null,
+ "maxEventTime" : null,
+ "runtimeStats" : null
+ } ],
+ "city=chennai" : [ {
+ "fileId" : "",
+ "path" : null,
+ "cdcStats" : null,
+ "prevCommit" : "null",
+ "numWrites" : 0,
+ "numDeletes" : 0,
+ "numUpdateWrites" : 0,
+ "numInserts" : 2,
+ "totalWriteBytes" : 0,
+ "totalWriteErrors" : 0,
+ "tempPath" : null,
+ "partitionPath" : null,
+ "totalLogRecords" : 0,
+ "totalLogFilesCompacted" : 0,
+ "totalLogSizeCompacted" : 0,
+ "totalUpdatedRecordsCompacted" : 0,
+ "totalLogBlocks" : 0,
+ "totalCorruptLogBlock" : 0,
+ "totalRollbackBlocks" : 0,
+ "fileSizeInBytes" : 0,
+ "minEventTime" : null,
+ "maxEventTime" : null,
+ "runtimeStats" : null
+ } ]
+ },
+ "compacted" : false,
+ "extraMetadata" : { },
+ "operationType" : "INSERT"
+}
diff --git
a/crates/core/tests/data/timeline/commits_load_schema_from_base_file_cow/city=chennai/03ffd613-fb74-456e-b6bb-115355d9b0ed-0_2-13-37_20250628002223107.parquet
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_cow/city=chennai/03ffd613-fb74-456e-b6bb-115355d9b0ed-0_2-13-37_20250628002223107.parquet
new file mode 100644
index 0000000..33f7b25
Binary files /dev/null and
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_cow/city=chennai/03ffd613-fb74-456e-b6bb-115355d9b0ed-0_2-13-37_20250628002223107.parquet
differ
diff --git
a/crates/core/tests/data/timeline/commits_load_schema_from_base_file_cow/city=san_francisco/b271b5f8-29df-463d-ba4d-feedbe6e09ed-0_0-13-35_20250628002223107.parquet
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_cow/city=san_francisco/b271b5f8-29df-463d-ba4d-feedbe6e09ed-0_0-13-35_20250628002223107.parquet
new file mode 100644
index 0000000..7394806
Binary files /dev/null and
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_cow/city=san_francisco/b271b5f8-29df-463d-ba4d-feedbe6e09ed-0_0-13-35_20250628002223107.parquet
differ
diff --git
a/crates/core/tests/data/timeline/commits_load_schema_from_base_file_cow/city=sao_paulo/a3c5da68-55a5-4804-ab8b-57e75252c69f-0_1-13-36_20250628002223107.parquet
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_cow/city=sao_paulo/a3c5da68-55a5-4804-ab8b-57e75252c69f-0_1-13-36_20250628002223107.parquet
new file mode 100644
index 0000000..4071d25
Binary files /dev/null and
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_cow/city=sao_paulo/a3c5da68-55a5-4804-ab8b-57e75252c69f-0_1-13-36_20250628002223107.parquet
differ
diff --git
a/crates/core/tests/data/timeline/commits_load_schema_from_base_file_mor/.hoodie/20250331030642808.deltacommit
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_mor/.hoodie/20250331030642808.deltacommit
new file mode 100644
index 0000000..85e86f2
--- /dev/null
+++
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_mor/.hoodie/20250331030642808.deltacommit
@@ -0,0 +1,94 @@
+{
+ "partitionToWriteStats" : {
+ "city=san_francisco" : [ {
+ "fileId" : "d0304c53-6fd2-4b7a-a9d6-5ff632f79224-0",
+ "path" :
"city=san_francisco/d0304c53-6fd2-4b7a-a9d6-5ff632f79224-0_0-13-60_20250331030642808.parquet",
+ "cdcStats" : null,
+ "prevCommit" : "null",
+ "numWrites" : 4,
+ "numDeletes" : 0,
+ "numUpdateWrites" : 0,
+ "numInserts" : 4,
+ "totalWriteBytes" : 436406,
+ "totalWriteErrors" : 0,
+ "tempPath" : null,
+ "partitionPath" : "city=san_francisco",
+ "totalLogRecords" : 0,
+ "totalLogFilesCompacted" : 0,
+ "totalLogSizeCompacted" : 0,
+ "totalUpdatedRecordsCompacted" : 0,
+ "totalLogBlocks" : 0,
+ "totalCorruptLogBlock" : 0,
+ "totalRollbackBlocks" : 0,
+ "fileSizeInBytes" : 436406,
+ "minEventTime" : null,
+ "maxEventTime" : null,
+ "runtimeStats" : {
+ "totalScanTime" : 0,
+ "totalUpsertTime" : 0,
+ "totalCreateTime" : 491
+ }
+ } ],
+ "city=sao_paulo" : [ {
+ "fileId" : "061498b3-e8ef-42f9-9d17-a509b2779501-0",
+ "path" :
"city=sao_paulo/061498b3-e8ef-42f9-9d17-a509b2779501-0_1-13-61_20250331030642808.parquet",
+ "cdcStats" : null,
+ "prevCommit" : "null",
+ "numWrites" : 2,
+ "numDeletes" : 0,
+ "numUpdateWrites" : 0,
+ "numInserts" : 2,
+ "totalWriteBytes" : 436205,
+ "totalWriteErrors" : 0,
+ "tempPath" : null,
+ "partitionPath" : "city=sao_paulo",
+ "totalLogRecords" : 0,
+ "totalLogFilesCompacted" : 0,
+ "totalLogSizeCompacted" : 0,
+ "totalUpdatedRecordsCompacted" : 0,
+ "totalLogBlocks" : 0,
+ "totalCorruptLogBlock" : 0,
+ "totalRollbackBlocks" : 0,
+ "fileSizeInBytes" : 436205,
+ "minEventTime" : null,
+ "maxEventTime" : null,
+ "runtimeStats" : {
+ "totalScanTime" : 0,
+ "totalUpsertTime" : 0,
+ "totalCreateTime" : 491
+ }
+ } ],
+ "city=chennai" : [ {
+ "fileId" : "84e82649-b1ee-4a25-a316-17cc6872616b-0",
+ "path" :
"city=chennai/84e82649-b1ee-4a25-a316-17cc6872616b-0_2-13-62_20250331030642808.parquet",
+ "cdcStats" : null,
+ "prevCommit" : "null",
+ "numWrites" : 2,
+ "numDeletes" : 0,
+ "numUpdateWrites" : 0,
+ "numInserts" : 2,
+ "totalWriteBytes" : 436184,
+ "totalWriteErrors" : 0,
+ "tempPath" : null,
+ "partitionPath" : "city=chennai",
+ "totalLogRecords" : 0,
+ "totalLogFilesCompacted" : 0,
+ "totalLogSizeCompacted" : 0,
+ "totalUpdatedRecordsCompacted" : 0,
+ "totalLogBlocks" : 0,
+ "totalCorruptLogBlock" : 0,
+ "totalRollbackBlocks" : 0,
+ "fileSizeInBytes" : 436184,
+ "minEventTime" : null,
+ "maxEventTime" : null,
+ "runtimeStats" : {
+ "totalScanTime" : 0,
+ "totalUpsertTime" : 0,
+ "totalCreateTime" : 491
+ }
+ } ]
+ },
+ "compacted" : false,
+ "extraMetadata" : { },
+ "operationType" : "UPSERT"
+}
diff --git
a/crates/core/tests/data/timeline/commits_load_schema_from_base_file_mor/.hoodie/20250331030642808.deltacommit.inflight
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_mor/.hoodie/20250331030642808.deltacommit.inflight
new file mode 100644
index 0000000..976e451
--- /dev/null
+++
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_mor/.hoodie/20250331030642808.deltacommit.inflight
@@ -0,0 +1,82 @@
+{
+ "partitionToWriteStats" : {
+ "city=san_francisco" : [ {
+ "fileId" : "",
+ "path" : null,
+ "cdcStats" : null,
+ "prevCommit" : "null",
+ "numWrites" : 0,
+ "numDeletes" : 0,
+ "numUpdateWrites" : 0,
+ "numInserts" : 4,
+ "totalWriteBytes" : 0,
+ "totalWriteErrors" : 0,
+ "tempPath" : null,
+ "partitionPath" : null,
+ "totalLogRecords" : 0,
+ "totalLogFilesCompacted" : 0,
+ "totalLogSizeCompacted" : 0,
+ "totalUpdatedRecordsCompacted" : 0,
+ "totalLogBlocks" : 0,
+ "totalCorruptLogBlock" : 0,
+ "totalRollbackBlocks" : 0,
+ "fileSizeInBytes" : 0,
+ "minEventTime" : null,
+ "maxEventTime" : null,
+ "runtimeStats" : null
+ } ],
+ "city=sao_paulo" : [ {
+ "fileId" : "",
+ "path" : null,
+ "cdcStats" : null,
+ "prevCommit" : "null",
+ "numWrites" : 0,
+ "numDeletes" : 0,
+ "numUpdateWrites" : 0,
+ "numInserts" : 2,
+ "totalWriteBytes" : 0,
+ "totalWriteErrors" : 0,
+ "tempPath" : null,
+ "partitionPath" : null,
+ "totalLogRecords" : 0,
+ "totalLogFilesCompacted" : 0,
+ "totalLogSizeCompacted" : 0,
+ "totalUpdatedRecordsCompacted" : 0,
+ "totalLogBlocks" : 0,
+ "totalCorruptLogBlock" : 0,
+ "totalRollbackBlocks" : 0,
+ "fileSizeInBytes" : 0,
+ "minEventTime" : null,
+ "maxEventTime" : null,
+ "runtimeStats" : null
+ } ],
+ "city=chennai" : [ {
+ "fileId" : "",
+ "path" : null,
+ "cdcStats" : null,
+ "prevCommit" : "null",
+ "numWrites" : 0,
+ "numDeletes" : 0,
+ "numUpdateWrites" : 0,
+ "numInserts" : 2,
+ "totalWriteBytes" : 0,
+ "totalWriteErrors" : 0,
+ "tempPath" : null,
+ "partitionPath" : null,
+ "totalLogRecords" : 0,
+ "totalLogFilesCompacted" : 0,
+ "totalLogSizeCompacted" : 0,
+ "totalUpdatedRecordsCompacted" : 0,
+ "totalLogBlocks" : 0,
+ "totalCorruptLogBlock" : 0,
+ "totalRollbackBlocks" : 0,
+ "fileSizeInBytes" : 0,
+ "minEventTime" : null,
+ "maxEventTime" : null,
+ "runtimeStats" : null
+ } ]
+ },
+ "compacted" : false,
+ "extraMetadata" : { },
+ "operationType" : "UPSERT"
+}
\ No newline at end of file
diff --git
a/crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/20240402144910683.inflight
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_mor/.hoodie/20250331030642808.deltacommit.requested
similarity index 100%
copy from
crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/20240402144910683.inflight
copy to
crates/core/tests/data/timeline/commits_load_schema_from_base_file_mor/.hoodie/20250331030642808.deltacommit.requested
diff --git
a/crates/core/tests/data/timeline/commits_load_schema_from_base_file_mor/.hoodie/20250331030645735.deltacommit
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_mor/.hoodie/20250331030645735.deltacommit
new file mode 100644
index 0000000..f0b09a1
--- /dev/null
+++
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_mor/.hoodie/20250331030645735.deltacommit
@@ -0,0 +1,43 @@
+{
+ "partitionToWriteStats" : {
+ "city=san_francisco" : [ {
+ "fileId" : "d0304c53-6fd2-4b7a-a9d6-5ff632f79224-0",
+ "path" :
"city=san_francisco/.d0304c53-6fd2-4b7a-a9d6-5ff632f79224-0_20250331030642808.log.1_0-26-85",
+ "cdcStats" : null,
+ "prevCommit" : "20250331030642808",
+ "numWrites" : 1,
+ "numDeletes" : 0,
+ "numUpdateWrites" : 1,
+ "numInserts" : 0,
+ "totalWriteBytes" : 1148,
+ "totalWriteErrors" : 0,
+ "tempPath" : null,
+ "partitionPath" : "city=san_francisco",
+ "totalLogRecords" : 0,
+ "totalLogFilesCompacted" : 0,
+ "totalLogSizeCompacted" : 0,
+ "totalUpdatedRecordsCompacted" : 0,
+ "totalLogBlocks" : 0,
+ "totalCorruptLogBlock" : 0,
+ "totalRollbackBlocks" : 0,
+ "fileSizeInBytes" : 1148,
+ "minEventTime" : null,
+ "maxEventTime" : null,
+ "runtimeStats" : {
+ "totalScanTime" : 0,
+ "totalUpsertTime" : 296,
+ "totalCreateTime" : 0
+ },
+ "logVersion" : 1,
+ "logOffset" : 0,
+ "baseFile" :
"d0304c53-6fd2-4b7a-a9d6-5ff632f79224-0_0-13-60_20250331030642808.parquet",
+ "logFiles" : [
".d0304c53-6fd2-4b7a-a9d6-5ff632f79224-0_20250331030642808.log.1_0-26-85" ],
+ "recordsStats" : {
+ "val" : null
+ }
+ } ]
+ },
+ "compacted" : false,
+ "extraMetadata" : { },
+ "operationType" : "UPSERT_PREPPED"
+}
diff --git
a/crates/core/tests/data/timeline/commits_load_schema_from_base_file_mor/.hoodie/20250331030645735.deltacommit.inflight
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_mor/.hoodie/20250331030645735.deltacommit.inflight
new file mode 100644
index 0000000..6ce54c9
--- /dev/null
+++
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_mor/.hoodie/20250331030645735.deltacommit.inflight
@@ -0,0 +1,56 @@
+{
+ "partitionToWriteStats" : {
+ "city=san_francisco" : [ {
+ "fileId" : "",
+ "path" : null,
+ "cdcStats" : null,
+ "prevCommit" : "null",
+ "numWrites" : 0,
+ "numDeletes" : 0,
+ "numUpdateWrites" : 0,
+ "numInserts" : 0,
+ "totalWriteBytes" : 0,
+ "totalWriteErrors" : 0,
+ "tempPath" : null,
+ "partitionPath" : null,
+ "totalLogRecords" : 0,
+ "totalLogFilesCompacted" : 0,
+ "totalLogSizeCompacted" : 0,
+ "totalUpdatedRecordsCompacted" : 0,
+ "totalLogBlocks" : 0,
+ "totalCorruptLogBlock" : 0,
+ "totalRollbackBlocks" : 0,
+ "fileSizeInBytes" : 0,
+ "minEventTime" : null,
+ "maxEventTime" : null,
+ "runtimeStats" : null
+ }, {
+ "fileId" : "d0304c53-6fd2-4b7a-a9d6-5ff632f79224-0",
+ "path" : null,
+ "cdcStats" : null,
+ "prevCommit" : "20250331030642808",
+ "numWrites" : 0,
+ "numDeletes" : 0,
+ "numUpdateWrites" : 1,
+ "numInserts" : 0,
+ "totalWriteBytes" : 0,
+ "totalWriteErrors" : 0,
+ "tempPath" : null,
+ "partitionPath" : null,
+ "totalLogRecords" : 0,
+ "totalLogFilesCompacted" : 0,
+ "totalLogSizeCompacted" : 0,
+ "totalUpdatedRecordsCompacted" : 0,
+ "totalLogBlocks" : 0,
+ "totalCorruptLogBlock" : 0,
+ "totalRollbackBlocks" : 0,
+ "fileSizeInBytes" : 0,
+ "minEventTime" : null,
+ "maxEventTime" : null,
+ "runtimeStats" : null
+ } ]
+ },
+ "compacted" : false,
+ "extraMetadata" : { },
+ "operationType" : "UPSERT_PREPPED"
+}
\ No newline at end of file
diff --git
a/crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/hoodie.properties
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_mor/.hoodie/20250331030645735.deltacommit.requested
similarity index 100%
copy from
crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/hoodie.properties
copy to
crates/core/tests/data/timeline/commits_load_schema_from_base_file_mor/.hoodie/20250331030645735.deltacommit.requested
diff --git
a/crates/core/tests/data/timeline/commits_load_schema_from_base_file_mor/city=san_francisco/.d0304c53-6fd2-4b7a-a9d6-5ff632f79224-0_20250331030642808.log.1_0-26-85
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_mor/city=san_francisco/.d0304c53-6fd2-4b7a-a9d6-5ff632f79224-0_20250331030642808.log.1_0-26-85
new file mode 100644
index 0000000..95a99ea
Binary files /dev/null and
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_mor/city=san_francisco/.d0304c53-6fd2-4b7a-a9d6-5ff632f79224-0_20250331030642808.log.1_0-26-85
differ
diff --git
a/crates/core/tests/data/timeline/commits_load_schema_from_base_file_mor/city=san_francisco/d0304c53-6fd2-4b7a-a9d6-5ff632f79224-0_0-13-60_20250331030642808.parquet
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_mor/city=san_francisco/d0304c53-6fd2-4b7a-a9d6-5ff632f79224-0_0-13-60_20250331030642808.parquet
new file mode 100644
index 0000000..4262432
Binary files /dev/null and
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_mor/city=san_francisco/d0304c53-6fd2-4b7a-a9d6-5ff632f79224-0_0-13-60_20250331030642808.parquet
differ
diff --git
a/crates/core/tests/data/timeline/commits_with_empty_commit_metadata/.hoodie/20240402144910683.commit
b/crates/core/tests/data/timeline/commits_with_empty_commit_metadata/.hoodie/20240402144910683.commit
new file mode 100644
index 0000000..0967ef4
--- /dev/null
+++
b/crates/core/tests/data/timeline/commits_with_empty_commit_metadata/.hoodie/20240402144910683.commit
@@ -0,0 +1 @@
+{}
diff --git
a/crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/20240402144910683.commit.requested
b/crates/core/tests/data/timeline/commits_with_empty_commit_metadata/.hoodie/20240402144910683.commit.requested
similarity index 100%
copy from
crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/20240402144910683.commit.requested
copy to
crates/core/tests/data/timeline/commits_with_empty_commit_metadata/.hoodie/20240402144910683.commit.requested
diff --git
a/crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/20240402144910683.inflight
b/crates/core/tests/data/timeline/commits_with_empty_commit_metadata/.hoodie/20240402144910683.inflight
similarity index 100%
copy from
crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/20240402144910683.inflight
copy to
crates/core/tests/data/timeline/commits_with_empty_commit_metadata/.hoodie/20240402144910683.inflight
diff --git
a/crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/hoodie.properties
b/crates/core/tests/data/timeline/commits_with_empty_commit_metadata/.hoodie/hoodie.properties
similarity index 100%
copy from
crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/hoodie.properties
copy to
crates/core/tests/data/timeline/commits_with_empty_commit_metadata/.hoodie/hoodie.properties
diff --git
a/crates/core/tests/data/timeline/commits_with_no_schema_and_write_stats/.hoodie/20240402144910683.commit
b/crates/core/tests/data/timeline/commits_with_no_schema_and_write_stats/.hoodie/20240402144910683.commit
new file mode 100644
index 0000000..4d59d6a
--- /dev/null
+++
b/crates/core/tests/data/timeline/commits_with_no_schema_and_write_stats/.hoodie/20240402144910683.commit
@@ -0,0 +1,6 @@
+{
+ "partitionToWriteStats" : { },
+ "compacted" : false,
+ "extraMetadata" : { },
+ "operationType" : "BULK_INSERT"
+}
diff --git
a/crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/20240402144910683.commit.requested
b/crates/core/tests/data/timeline/commits_with_no_schema_and_write_stats/.hoodie/20240402144910683.commit.requested
similarity index 100%
copy from
crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/20240402144910683.commit.requested
copy to
crates/core/tests/data/timeline/commits_with_no_schema_and_write_stats/.hoodie/20240402144910683.commit.requested
diff --git
a/crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/20240402144910683.inflight
b/crates/core/tests/data/timeline/commits_with_no_schema_and_write_stats/.hoodie/20240402144910683.inflight
similarity index 100%
copy from
crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/20240402144910683.inflight
copy to
crates/core/tests/data/timeline/commits_with_no_schema_and_write_stats/.hoodie/20240402144910683.inflight
diff --git
a/crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/hoodie.properties
b/crates/core/tests/data/timeline/commits_with_no_schema_and_write_stats/.hoodie/hoodie.properties
similarity index 100%
copy from
crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/hoodie.properties
copy to
crates/core/tests/data/timeline/commits_with_no_schema_and_write_stats/.hoodie/hoodie.properties
diff --git
a/crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/20240402144910683.commit
b/crates/core/tests/data/timeline/commits_with_valid_schema_in_commit_metadata/.hoodie/20240402144910683.commit
similarity index 100%
rename from
crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/20240402144910683.commit
rename to
crates/core/tests/data/timeline/commits_with_valid_schema_in_commit_metadata/.hoodie/20240402144910683.commit
diff --git
a/crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/20240402144910683.commit.requested
b/crates/core/tests/data/timeline/commits_with_valid_schema_in_commit_metadata/.hoodie/20240402144910683.commit.requested
similarity index 100%
rename from
crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/20240402144910683.commit.requested
rename to
crates/core/tests/data/timeline/commits_with_valid_schema_in_commit_metadata/.hoodie/20240402144910683.commit.requested
diff --git
a/crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/20240402144910683.inflight
b/crates/core/tests/data/timeline/commits_with_valid_schema_in_commit_metadata/.hoodie/20240402144910683.inflight
similarity index 100%
rename from
crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/20240402144910683.inflight
rename to
crates/core/tests/data/timeline/commits_with_valid_schema_in_commit_metadata/.hoodie/20240402144910683.inflight
diff --git
a/crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/hoodie.properties
b/crates/core/tests/data/timeline/commits_with_valid_schema_in_commit_metadata/.hoodie/hoodie.properties
similarity index 100%
rename from
crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/hoodie.properties
rename to
crates/core/tests/data/timeline/commits_with_valid_schema_in_commit_metadata/.hoodie/hoodie.properties
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index f3a0601..b22a30d 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -337,6 +337,8 @@ mod tests {
use datafusion::logical_expr::BinaryExpr;
use hudi_core::config::read::HudiReadConfig::InputPartitions;
+ use hudi_core::metadata::meta_field::MetaField;
+ use hudi_test::assert_arrow_field_names_eq;
use hudi_test::SampleTable::{
V6ComplexkeygenHivestyle, V6Empty, V6Nonpartitioned,
V6SimplekeygenHivestyleNoMetafields,
V6SimplekeygenNonhivestyle, V6SimplekeygenNonhivestyleOverwritetable,
@@ -358,13 +360,16 @@ mod tests {
}
#[tokio::test]
- async fn test_get_empty_schema_from_empty_table() {
+ async fn test_get_create_schema_from_empty_table() {
let table_provider =
HudiDataSource::new_with_options(V6Empty.path_to_cow().as_str(),
empty_options())
.await
.unwrap();
let schema = table_provider.schema();
- assert!(schema.fields().is_empty());
+ assert_arrow_field_names_eq!(
+ schema,
+ [MetaField::field_names(), vec!["id", "name", "isActive"]].concat()
+ );
}
async fn register_test_table_with_session<I, K, V>(
diff --git a/crates/test/src/util.rs b/crates/test/src/util.rs
index 87c4889..98a84cd 100644
--- a/crates/test/src/util.rs
+++ b/crates/test/src/util.rs
@@ -65,3 +65,35 @@ pub fn set_fixed_timezone(tz: &str) {
pub fn reset_timezone() {
env::remove_var("TZ");
}
+
+#[macro_export]
+macro_rules! assert_arrow_field_names_eq {
+ ($schema:expr, $expected:expr) => {{
+ let actual: Vec<_> = $schema.fields().iter().map(|f|
f.name()).collect();
+ assert_eq!(
+ actual, $expected,
+ "Schema field names do not match expected fields.\nActual:
{:?}\nExpected: {:?}",
+ actual, $expected
+ );
+ }};
+}
+
+#[macro_export]
+macro_rules! assert_avro_field_names_eq {
+ ($schema:expr, $expected:expr) => {{
+ let schema_json_value =
serde_json::from_str::<serde_json::Value>($schema).unwrap();
+ let actual = schema_json_value
+ .get("fields")
+ .unwrap()
+ .as_array()
+ .unwrap()
+ .iter()
+ .map(|f| f.get("name").unwrap().as_str().unwrap())
+ .collect::<Vec<_>>();
+ assert_eq!(
+ actual, $expected,
+ "Schema field names do not match expected fields.\nActual:
{:?}\nExpected: {:?}",
+ actual, $expected
+ );
+ }};
+}